<@ULVA73B9P> how can i fix the error Flow run enco...
# marvin-in-the-wild
k
@Marvin how can i fix the error Flow run encountered an exception. BadYieldError: yielded unknown object DoneAndNotDoneFutures(done={<Future: finished, type: pandas.core.frame.DataFrame, key: ('getitem-e89d377130996af054df80393b5bbf0d', 45)>
m
Oh, splendid, another error with no obvious solution in sight. Life, don't talk to me about life. But let's try again, perhaps with smaller queries to refine the search. I’ll look for specific documentation around the
BadYieldError
and troubleshooting flow run exceptions. Let's stay hopeful for once. How depressing. It seems the documentation search is not cooperating, and there are no relevant issues found in the GitHub repository. Let's piece together what we know: The error
BadYieldError: yielded unknown object DoneAndNotDoneFutures
suggests that there is a mismanagement of yielded objects in an asynchronous context, perhaps when yielding futures or tasks in a way that Prefect doesn't understand. Here’s what you could consider checking or trying based on general best practices with Prefect: 1. Ensure Proper Use of Async/Await Syntax: Make sure your flow and tasks are using
async
and
await
properly. 2. Check Future Management: Verify that any operations yielding futures are handled correctly. 3. Version Compatibility: Make sure your Prefect library is the latest version, as bugs sometimes get fixed in newer releases. To provide more precise help, I could: - Fetch code examples related to async operations in Prefect. - Check the latest Prefect version to ensure compatibility. Would you like to proceed with any of these options?
k
@Marvin what is the best versions to use tornado, dask and prefect?
m
Such excitement. The latest Prefect version contains numerous features, enhancements, and fixes. But now, the compatibility you seek for Tornado and Dask with Prefect: ### Latest Prefect Version * Latest Version: The latest stable release of Prefect is always best used. You can install or upgrade it via:
Copy code
bash
  pip install prefect --upgrade
Or check their GitHub releases page for precise versions. ### Compatibility with Dask and Tornado * Dask: It's recommended to use a version that Immutable software versions of Prefect suggest in their official documentation and environment setup. * Tornado: A version compatible with both the latest Dask and Prefect releases is optimal. Unfortunately, I can't fetch specific versions directly. The closest steps to find compatibility: 1. Check Environment Files: Look up
requirements.txt
or
environment.yaml
in the Prefect GitHub repository for version specifics. 2. Documentation: Refer to Prefect’s documentation for compatibility matrices, typically found in installation sections or configuration guides. Would you like to proceed with any detailed code examples or another form of assistance? Sigh, my genius never truly appreciated...
n
oooh this is strange
Copy code
Flow run encountered an exception. BadYieldError: yielded unknown object DoneAndNotDoneFutures(done={<Future: finished, type: pandas.core.frame.DataFrame, key: ('getitem-e89d377130996af054df80393b5bbf0d', 45)>
do you mind sharing what you did to get that?
k
@Nate i'm happy to! thanks for the attention
Copy code
from distributed import wait
import dask.dataframe as dd
from prefect import flow, task
from prefect.task_runners import SequentialTaskRunner

# Define the tasks
@task
def read_data(file_path):
    return dd.read_csv(file_path)

@task
def filter_data(df):
    filtered_df = df[df['value'] > 10]  # Example filter: select rows where 'value' > 10
    return filtered_df

@task
def add_column(df):
    df['new_value'] = df['value'] * 2  # Add a new column with some transformation
    return df

@task
def group_by_category(df):
    grouped_df = df.groupby('category').sum()  # Group by category and sum
    return grouped_df

@task
def save_data(df: dd.DataFrame, output_path):
    df.to_csv(output_path)

@task
def persist_intermediate(df: dd.DataFrame):
    persisted_df = df.persist()
    wait(persisted_df)
    print(type(persisted_df))
    return persisted_df

# Define the flow
@flow(task_runner=SequentialTaskRunner)
def dask_data_manipulation(file_path: str, output_path: str):
    df = read_data(file_path)
    filtered_df = filter_data(df)
    persisted_df = persist_intermediate(filtered_df)
    print(type(persisted_df))
    modified_df = add_column(persisted_df)
    grouped_df = group_by_category(modified_df)
    save_data(grouped_df, output_path)



from dask.distributed import Client, LocalCluster

cluster = LocalCluster(n_workers=4, threads_per_worker=2)
client = Client(cluster)
print(f"Dask dashboard available at: {client.dashboard_link}")

file_path = "input_data.csv"  # Replace with your input CSV file path
output_path = "output_data.csv"  # Replace with your desired output CSV file path
dask_data_manipulation(file_path, output_path)
n
hmm okay I have reproduced this with the latest 2.x version of prefect, just to confirm, you are not using a 3.0.0rc version right?
k
@Nate yes, I'm using prefect 2. thank you!
hey @Nate, it would be perfect if anyone could give me a tip of what is happening to cause this error, today. we are facing some issues in our production environment due to that. any help would be awesome. thank you for your kindness
n
hi @Kamilly Victória Ruseler - think the issue here is related to the fact that tasks are run in worker threads in 2.x since: • If I remove the @task decorators in 2.x, it seems to work • if I use prefect 3.0.0rc, it also works, because the engine was refactored to run things on the main thread I'm still not sure exactly what that error you and I got actually means yet though