Sanjay Patel05/26/2022, 10:32 AM
however the flow contains a task that actually has a prefect flow embedded in it. An unknown number of parallel tasks that the primary task prepares (lets say 15 of them) and then tries to execute. I would like the 15 to be done in parallel managed by dask. I don't want to create a new dask cluster from within the primary task (not even sure if that would work) but instead want to execute the flow in the same cluster. The following line works but i'm not sure it's going to utilize all the available workers and may only execute on the worker that initiates the flow. Any guidance on how I should be running the flow2 which is prepared and called the by the primary task in 'flow' above?
output = flow.run(executor=DaskExecutor(address=XXX, client_kwargs=XXX))
I believe i'll need to use something like this if i was using pure dask - https://distributed.dask.org/en/stable/task-launch.html but i'm not sure the prefect method which will achieve this and would like to keep the additional features that prefect offers. Thanks so much in advance!
output2 = flow2.run()
Kevin Kho05/26/2022, 2:16 PM
but in order to submit more work to a cluster using a task, you need to use the following syntax:
You have to manage this yourself with native Dask. But if you use
from dask.distributed import worker_client @task def calling_compute_in_a_task(filepath): with worker_client(): worker_client.submit(more_work)
on a Prefect Flow, the whole Flow gets sent to a worker together so it won’t use everything. So you need to break it up. The key thing here is to use
Sanjay Patel05/26/2022, 8:03 PM