Kamilly Victória Ruseler
09/20/2024, 1:29 PMMarvin
09/20/2024, 1:29 PMget_async_dask_client
if you're using asynchronous operations.
2. Awaiting Computations: When using the async Dask client, you need to await the computation. For example:
python
@task
async def compute_task():
async with get_async_dask_client() as client:
df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
summary_df = await client.compute(df.describe())
return summary_df
3. Using DaskTaskRunner: Although you mentioned not using DaskTaskRunner
, it's highly recommended as it ensures proper integration:
python
@flow(task_runner=DaskTaskRunner())
async def dask_flow():
prefect_future = await compute_task.submit()
return await prefect_future.result()
await dask_flow()
4. Guarding the Main Script: To avoid potential warnings or errors, ensure that your script is guarded:
python
if __name__ == "__main__":
import asyncio
asyncio.run(dask_flow())
5. Cluster Address: If you don't provide the address of a Dask scheduler, Prefect creates a temporary local cluster automatically. Specifying n_workers
and threads_per_worker
might help:
python
DaskTaskRunner(cluster_kwargs={"n_workers": 4, "threads_per_worker": 2})
Check out the Prefect 3.x Dask usage guide for more details: https://docs-3.prefect.io/integrations/prefect-dask/usage_guide.
### GitHub Issue Reference:
- There's a known issue with get_dask_client
not working with Coiled clusters, but it works with `dask.distributed.LocalCluster`: https://github.com/PrefectHQ/prefect/issues/12971.
If none of these solutions resolve your issue, debugging with more logs or checking the Dask dashboard may provide additional insights.Kamilly Victória Ruseler
09/20/2024, 1:34 PMMarvin
09/20/2024, 1:35 PMget_async_dask_client()
is encouraged for async operations, which needs to be awaited for computations.
- Example:
python
@task
async def compute_task():
async with get_async_dask_client() as client:
df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
summary_df = await client.compute(df.describe())
return summary_df
2. DaskTaskRunner:
- While using Dask with Prefect, the DaskTaskRunner
is recommended to handle execution properly. This task runner can create temporary clusters if necessary.
- Example:
python
@flow(task_runner=DaskTaskRunner())
async def dask_flow():
prefect_future = await compute_task.submit()
return await prefect_future.result()
3. Temporary Clusters:
- DaskTaskRunner
can create a temporary cluster using Dask's cluster-manager options. This supports adaptive scaling per flow run.
4. Automatic Local Cluster:
- If no Dask scheduler address is provided, Prefect will create a local cluster automatically. You can specify settings like n_workers
and threads_per_worker
.
5. Concurrency and Submission:
- The way tasks are submitted to Dask clusters and how results are awaited have been more streamlined. Ensure all computations within flows are properly awaited to avoid hanging or blocking behavior.
6. Enhancements in Documentation:
- Updated documentation provides clearer guides on how to integrate Dask with Prefect, including better examples and migration tips.
- For more detailed information, refer to the Prefect 3 Dask integration docs: Prefect 3 Dask Usage Guide.
For any specific issues or further details, refer to the Prefect Dask Integration section in the documentation: https://docs-3.prefect.io/integrations/prefect-dask/index