https://prefect.io logo
v

Vishal Talasani

02/16/2021, 8:55 AM
Hey everyone, I’m very new to Prefect and am struggling with something simple.. I’m unable to get Prefect Server to execute multiple jobs in parallel. My guess is I have something basic misconfigured, but I’m not sure what exactly. I’ve set the executor for my flow to be
Copy code
LocalDaskExecutor(scheduler="threads", num_workers=5)
but it seems like I still can only serially execute my flows. Does anyone here have experience with this? Thank you so much for the help!
g

Greg Roche

02/16/2021, 9:20 AM
Just to clarify, Prefect supports two sorts of parallel execution: 1. Running multiple flows at the same time 2. Within a flow, running multiple tasks at the same time Prefect Server has no limits on running multiple flows at the same time, you just register and schedule them. For running multiple tasks at the same time within one flow, using a Dask executor like you mentioned above is necessary but not sufficient - you also need to structure your flow in a way which allows multiple tasks to execute at the same time, most probably by using mapping.
upvote 1
v

Vishal Talasani

02/16/2021, 9:54 AM
Ah yes, makes sense. In my case, I’m trying to run multiple tasks simultaneously (within one flow). My flow is structured like the attached image; although I’m not calling map, I would think the LocalDaskExecutor would run these in parallel? Should I instead explicitly use map to get the parallelization? Thanks a lot for the help with this Greg!
g

Greg Roche

02/16/2021, 10:18 AM
Yes I would definitely use task mapping in this situation to get parallelisation. It might be worth running
flow.visualize()
to see the generated DAG which Prefect will use to execute the run, this will show how Prefect associates the tasks with each other and which dependencies each task would need to wait for in order to run.
v

Vishal Talasani

02/16/2021, 8:32 PM
Thank you, this is very helpful! Looking at flow.visualize(), it seems like all the dependencies are met for my nodes. I’ve also tried this with another workflow that uses map, and it seems like map is also executing one at a time. Right before I register my flow, I set
Copy code
flow.executor = LocalDaskExecutor(scheduler='threads')
flow.run_config = LocalRun()
should I be setting another variable on my flow before registering? Or is there perhaps something I need to run at the agent level to make it so I can run a local dask executor? Thanks again for the help!
Ahh never mind, I got it! I was setting the flow executor in my main method, but because I’m using file storage, the executor isn’t being set when I’m running remotely. I moved LocalDaskExecutor to be run when the flow is created, and everything works now. Thank you Greg, really appreciate your help through this!
g

Greg Roche

02/17/2021, 8:14 AM
Glad to have helped!
3 Views