Tim Galvin
08/18/2025, 1:06 PMdef predict_with_dask(..., dask_client):
results = dask_client.compute([big_func(a) for a in range(200)]
more_results = dasl_client.compute([bigger_func(f) for r in results]
Now lets say I have a prefect task/flow like
@task
def my_task():
with get_dask_client() as client:
return crystalball_predict(
ms=ms,
dask_client=client,
)
@flow
def my_flow():
biggest = [my_task.submit(blah) for blah in range(200)]
I am running my flow with a dask.distributed cluster that is deployed on a large HPC of +500 nodes.
What is the expected scaling here? Does a single .compute()
from any one of the my_task
functions cause all other prefect tasks to block? Does this set up allow parallel .computes()
to run, or should there be some other way of invoking the parallelism of the many task delayed objects in the predict_with_dask
delayed workerflow?.Brendan Dalpe
08/18/2025, 2:32 PMThreadPoolTaskRunner
by default when you call .submit
. This fires up a local thread pool inside the Prefect runner for that flow.
So each task will get executed inside a ThreadPool with max threads specified by your settings (or sys.maxsize
by default which basically lets us go as fast as the single worker will go). In your code, we'd dispatch all of the tasks to Dask immediately because we don't need to wait. Blocking only happens when we wait
or gather
results.
I'd recommend switching up your Prefect code slightly to use the DaskTaskRunner
instead of calling the get_dask_client
inside each task. This will spawn a new instance of the client per task which is somewhat wasteful.
Here's a sample of what I think you could run:
from prefect_dask.task_runners import DaskTaskRunner
from dask.distributed import worker_client
@task
def crystalball_predict(ms):
with worker_client() as dask_client:
results = dask_client.compute([big_func(a) for a in range(200)])
more_results = dask_client.compute([bigger_func(f) for r in results])
return dask_client.gather(more_results)
@flow(task_runner=DaskTaskRunner())
def my_flow():
biggest = [crystalball_predict.submit(blah) for blah in range(200)]
wait(biggest)
Some docs that might be useful:
• https://docs.prefect.io/v3/how-to-guides/workflows/run-work-concurrently
• https://distributed.dask.org/en/latest/api.html#distributed.get_workerTim Galvin
08/19/2025, 12:56 AMDaskTaskRunner
using a dask_jobqueue.SLURMCluster
as Cluster implementation. So as far as that goes I think I am ok for the moment and my example is closer to your example then I first wrote, the only thing I had missing was the client.gather
call - I mean this in both my example as well as the external package.
I think I understand everything so far in your reply. I think my question boils down further to how many Client.compute()
and/or Client.gather()
can be executing at. time in a single distributed dask cluster instance? Should parallel my_tasks
be running via a .submit()
would the Client.compute()
that they each hit allow all to run at once, or does it go in a FIFO like manner?Brendan Dalpe
08/20/2025, 7:24 PMtask_runner=DaskTaskRunner()
and calling .submit
inside that method we will go as fast as Dask allows for parallelism 🙂
Your `.compute`/`.gather` calls shouldn't block the main flow loop assuming they get dispatched to Dask.Tim Galvin
08/21/2025, 1:04 AM.submit
side of things indeed I think the submission of each task goes as fast as it van. I am currenlty on prefect 2 and need to moved to prefect 3. I believe the engine there would be even faster.
The behaviour I am seeing is confusing. I created a separate prefect task that constructs a single dask DAG across all inputs, and I modified the enternal package to return the delayed objects rather than it calling the Client.compute
itself.
Once I collect all these delayed objects myself and calling the Client.compute()
once across all objects I saw about a 2x speed up. The allocated workers were all running much closer to 100% cpu.
If felt like that the dask Client
I am using (retrieved within each prefect task via a get_dask_client
call) only allows a single ,compute
to be running at a time. There is some interplay with this external package as well that I am trying to separate.