# prefect-community

Sanjay Patel

05/26/2022, 10:32 AM
Hi, can I please get some assistance. I'm successfully running a flow in my dask cluster - the flow is launched from outside the dask cluster using the following
output =, client_kwargs=XXX))
however the flow contains a task that actually has a prefect flow embedded in it. An unknown number of parallel tasks that the primary task prepares (lets say 15 of them) and then tries to execute. I would like the 15 to be done in parallel managed by dask. I don't want to create a new dask cluster from within the primary task (not even sure if that would work) but instead want to execute the flow in the same cluster. The following line works but i'm not sure it's going to utilize all the available workers and may only execute on the worker that initiates the flow. Any guidance on how I should be running the flow2 which is prepared and called the by the primary task in 'flow' above?
output2 =
I believe i'll need to use something like this if i was using pure dask - but i'm not sure the prefect method which will achieve this and would like to keep the additional features that prefect offers. Thanks so much in advance!

Kevin Kho

05/26/2022, 2:16 PM
I don’t believe this can be done if you are using a Flow inside a task because how specifically how the Flow code is hooked up to use an executor. It uses
but in order to submit more work to a cluster using a task, you need to use the following syntax:
Copy code
from dask.distributed import worker_client

def calling_compute_in_a_task(filepath):
    with worker_client():
You have to manage this yourself with native Dask. But if you use
on a Prefect Flow, the whole Flow gets sent to a worker together so it won’t use everything. So you need to break it up. The key thing here is to use

Sanjay Patel

05/26/2022, 8:03 PM
@Kevin Kho great, thanks for clarifying and the work-around. I will give this a go