Kamilly Victória Ruseler
07/01/2024, 7:19 PMMarvin
07/01/2024, 7:19 PMBadYieldError
and troubleshooting flow run exceptions.
Let's stay hopeful for once.
How depressing. It seems the documentation search is not cooperating, and there are no relevant issues found in the GitHub repository.
Let's piece together what we know:
The error BadYieldError: yielded unknown object DoneAndNotDoneFutures
suggests that there is a mismanagement of yielded objects in an asynchronous context, perhaps when yielding futures or tasks in a way that Prefect doesn't understand.
Here’s what you could consider checking or trying based on general best practices with Prefect:
1. Ensure Proper Use of Async/Await Syntax: Make sure your flow and tasks are using async
and await
properly.
2. Check Future Management: Verify that any operations yielding futures are handled correctly.
3. Version Compatibility: Make sure your Prefect library is the latest version, as bugs sometimes get fixed in newer releases.
To provide more precise help, I could:
- Fetch code examples related to async operations in Prefect.
- Check the latest Prefect version to ensure compatibility.
Would you like to proceed with any of these options?Kamilly Victória Ruseler
07/01/2024, 7:40 PMMarvin
07/01/2024, 7:41 PMbash
pip install prefect --upgrade
Or check their GitHub releases page for precise versions.
### Compatibility with Dask and Tornado
* Dask: It's recommended to use a version that Immutable software versions of Prefect suggest in their official documentation and environment setup.
* Tornado: A version compatible with both the latest Dask and Prefect releases is optimal.
Unfortunately, I can't fetch specific versions directly. The closest steps to find compatibility:
1. Check Environment Files: Look up requirements.txt
or environment.yaml
in the Prefect GitHub repository for version specifics.
2. Documentation: Refer to Prefect’s documentation for compatibility matrices, typically found in installation sections or configuration guides.
Would you like to proceed with any detailed code examples or another form of assistance? Sigh, my genius never truly appreciated...Nate
07/01/2024, 7:53 PMFlow run encountered an exception. BadYieldError: yielded unknown object DoneAndNotDoneFutures(done={<Future: finished, type: pandas.core.frame.DataFrame, key: ('getitem-e89d377130996af054df80393b5bbf0d', 45)>
do you mind sharing what you did to get that?Kamilly Victória Ruseler
07/01/2024, 8:07 PMfrom distributed import wait
import dask.dataframe as dd
from prefect import flow, task
from prefect.task_runners import SequentialTaskRunner
# Define the tasks
@task
def read_data(file_path):
return dd.read_csv(file_path)
@task
def filter_data(df):
filtered_df = df[df['value'] > 10] # Example filter: select rows where 'value' > 10
return filtered_df
@task
def add_column(df):
df['new_value'] = df['value'] * 2 # Add a new column with some transformation
return df
@task
def group_by_category(df):
grouped_df = df.groupby('category').sum() # Group by category and sum
return grouped_df
@task
def save_data(df: dd.DataFrame, output_path):
df.to_csv(output_path)
@task
def persist_intermediate(df: dd.DataFrame):
persisted_df = df.persist()
wait(persisted_df)
print(type(persisted_df))
return persisted_df
# Define the flow
@flow(task_runner=SequentialTaskRunner)
def dask_data_manipulation(file_path: str, output_path: str):
df = read_data(file_path)
filtered_df = filter_data(df)
persisted_df = persist_intermediate(filtered_df)
print(type(persisted_df))
modified_df = add_column(persisted_df)
grouped_df = group_by_category(modified_df)
save_data(grouped_df, output_path)
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=4, threads_per_worker=2)
client = Client(cluster)
print(f"Dask dashboard available at: {client.dashboard_link}")
file_path = "input_data.csv" # Replace with your input CSV file path
output_path = "output_data.csv" # Replace with your desired output CSV file path
dask_data_manipulation(file_path, output_path)
Nate
07/01/2024, 9:00 PMKamilly Victória Ruseler
07/02/2024, 11:09 AMKamilly Victória Ruseler
07/02/2024, 3:56 PMNate
07/02/2024, 7:41 PM