Tim Galvin
08/18/2025, 1:06 PMdef predict_with_dask(..., dask_client):
results = dask_client.compute([big_func(a) for a in range(200)]
more_results = dasl_client.compute([bigger_func(f) for r in results]
Now lets say I have a prefect task/flow like
@task
def my_task():
with get_dask_client() as client:
return crystalball_predict(
ms=ms,
dask_client=client,
)
@flow
def my_flow():
biggest = [my_task.submit(blah) for blah in range(200)]
I am running my flow with a dask.distributed cluster that is deployed on a large HPC of +500 nodes.
What is the expected scaling here? Does a single .compute() from any one of the my_task functions cause all other prefect tasks to block? Does this set up allow parallel .computes() to run, or should there be some other way of invoking the parallelism of the many task delayed objects in the predict_with_dask delayed workerflow?.Brendan Dalpe
08/18/2025, 2:32 PMThreadPoolTaskRunner by default when you call .submit. This fires up a local thread pool inside the Prefect runner for that flow.
So each task will get executed inside a ThreadPool with max threads specified by your settings (or sys.maxsize by default which basically lets us go as fast as the single worker will go). In your code, we'd dispatch all of the tasks to Dask immediately because we don't need to wait. Blocking only happens when we wait or gather results.
I'd recommend switching up your Prefect code slightly to use the DaskTaskRunner instead of calling the get_dask_client inside each task. This will spawn a new instance of the client per task which is somewhat wasteful.
Here's a sample of what I think you could run:
from prefect_dask.task_runners import DaskTaskRunner
from dask.distributed import worker_client
@task
def crystalball_predict(ms):
with worker_client() as dask_client:
results = dask_client.compute([big_func(a) for a in range(200)])
more_results = dask_client.compute([bigger_func(f) for r in results])
return dask_client.gather(more_results)
@flow(task_runner=DaskTaskRunner())
def my_flow():
biggest = [crystalball_predict.submit(blah) for blah in range(200)]
wait(biggest)
Some docs that might be useful:
• https://docs.prefect.io/v3/how-to-guides/workflows/run-work-concurrently
• https://distributed.dask.org/en/latest/api.html#distributed.get_workerTim Galvin
08/19/2025, 12:56 AMDaskTaskRunner using a dask_jobqueue.SLURMCluster as Cluster implementation. So as far as that goes I think I am ok for the moment and my example is closer to your example then I first wrote, the only thing I had missing was the client.gather call - I mean this in both my example as well as the external package.
I think I understand everything so far in your reply. I think my question boils down further to how many Client.compute() and/or Client.gather() can be executing at. time in a single distributed dask cluster instance? Should parallel my_tasks be running via a .submit() would the Client.compute() that they each hit allow all to run at once, or does it go in a FIFO like manner?Brendan Dalpe
08/20/2025, 7:24 PMtask_runner=DaskTaskRunner() and calling .submit inside that method we will go as fast as Dask allows for parallelism 🙂
Your `.compute`/`.gather` calls shouldn't block the main flow loop assuming they get dispatched to Dask.Tim Galvin
08/21/2025, 1:04 AM.submit side of things indeed I think the submission of each task goes as fast as it van. I am currenlty on prefect 2 and need to moved to prefect 3. I believe the engine there would be even faster.
The behaviour I am seeing is confusing. I created a separate prefect task that constructs a single dask DAG across all inputs, and I modified the enternal package to return the delayed objects rather than it calling the Client.compute itself.
Once I collect all these delayed objects myself and calling the Client.compute() once across all objects I saw about a 2x speed up. The allocated workers were all running much closer to 100% cpu.
If felt like that the dask Client I am using (retrieved within each prefect task via a get_dask_client call) only allows a single ,compute to be running at a time. There is some interplay with this external package as well that I am trying to separate.