<@ULVA73B9P> We use Prefect with Dask, but we don'...
# marvin-in-the-wild
k
@Marvin We use Prefect with Dask, but we don't use DaskTaskRunner. We create a Dask client inside the flow. At some points in the code, we create some lazy calculations on the dataset and then compute them all at once using client = get_client() from distributed client.compute(values_to_compute) This worked normally with Prefect 2, but with Prefect 3 for some reason this compute doesn't run. It doesn't give an error, nothing happens, it just keeps running forever
m
Your issue with the Dask computation hanging in Prefect 3 might be due to changes in how Prefect integrates with Dask. Here are a few key points and suggestions: 1. Async Context for Dask Client: Prefect 3 has been updated to prefer async clients. Ensure that you are using
get_async_dask_client
if you're using asynchronous operations. 2. Awaiting Computations: When using the async Dask client, you need to await the computation. For example:
Copy code
python
@task
async def compute_task():
    async with get_async_dask_client() as client:
        df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
        summary_df = await client.compute(df.describe())
        return summary_df
3. Using DaskTaskRunner: Although you mentioned not using
DaskTaskRunner
, it's highly recommended as it ensures proper integration:
Copy code
python
@flow(task_runner=DaskTaskRunner())
async def dask_flow():
    prefect_future = await compute_task.submit()
    return await prefect_future.result()

await dask_flow()
4. Guarding the Main Script: To avoid potential warnings or errors, ensure that your script is guarded:
Copy code
python
if __name__ == "__main__":
    import asyncio
    asyncio.run(dask_flow())
5. Cluster Address: If you don't provide the address of a Dask scheduler, Prefect creates a temporary local cluster automatically. Specifying
n_workers
and
threads_per_worker
might help:
Copy code
python
DaskTaskRunner(cluster_kwargs={"n_workers": 4, "threads_per_worker": 2})
Check out the Prefect 3.x Dask usage guide for more details: https://docs-3.prefect.io/integrations/prefect-dask/usage_guide. ### GitHub Issue Reference: - There's a known issue with
get_dask_client
not working with Coiled clusters, but it works with `dask.distributed.LocalCluster`: https://github.com/PrefectHQ/prefect/issues/12971. If none of these solutions resolve your issue, debugging with more logs or checking the Dask dashboard may provide additional insights.
k
@Marvin can you please tell me everything that changed in the integration of prefect with dask in prefect 3
m
Here's a summary of key changes and enhancements in the integration of Prefect with Dask from Prefect 2 to Prefect 3: 1. Async Support: - Prefect 3 prefers using asynchronous clients where possible. Specifically, the use of
get_async_dask_client()
is encouraged for async operations, which needs to be awaited for computations. - Example:
Copy code
python
     @task
     async def compute_task():
         async with get_async_dask_client() as client:
             df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
             summary_df = await client.compute(df.describe())
             return summary_df
2. DaskTaskRunner: - While using Dask with Prefect, the
DaskTaskRunner
is recommended to handle execution properly. This task runner can create temporary clusters if necessary. - Example:
Copy code
python
     @flow(task_runner=DaskTaskRunner())
     async def dask_flow():
         prefect_future = await compute_task.submit()
         return await prefect_future.result()
3. Temporary Clusters: -
DaskTaskRunner
can create a temporary cluster using Dask's cluster-manager options. This supports adaptive scaling per flow run. 4. Automatic Local Cluster: - If no Dask scheduler address is provided, Prefect will create a local cluster automatically. You can specify settings like
n_workers
and
threads_per_worker
. 5. Concurrency and Submission: - The way tasks are submitted to Dask clusters and how results are awaited have been more streamlined. Ensure all computations within flows are properly awaited to avoid hanging or blocking behavior. 6. Enhancements in Documentation: - Updated documentation provides clearer guides on how to integrate Dask with Prefect, including better examples and migration tips. - For more detailed information, refer to the Prefect 3 Dask integration docs: Prefect 3 Dask Usage Guide. For any specific issues or further details, refer to the Prefect Dask Integration section in the documentation: https://docs-3.prefect.io/integrations/prefect-dask/index