*State of Concurrency/Parallelism for @flows* I've...
# ask-community
d
State of Concurrency/Parallelism for @flows I've been digging through the docs and github but having a real problem telling if concurrent/parallel execution of non-async subflows is supported with Prefect at this point. Can someone give a definitive answer since there seems to be a number of tickets open on this in github that relate to the issue (as well as an async workaround which will not work for my case) and a
flow.submit
feature request but I can't find docs or a solid answer on where this functionality stands. In sum, trying to take a list of files I generate from a directory read and then submit that to another flow for processing in tasked sequence up to the max concurrency limit (to replace what I have in Airflow). I want the second flow/subflow to be the one that runs in parallel (or minimally, concurrently). If this is possible in Prefect (without async). Can someone point me a the docs or a code example? It is tricky to tell with Prefect 2.0 if this works with
flow.submit
and a subflow.
o
My understanding is that you can make this work with the built-in functionality as long as you don't require the subflows to run in some other infrastructure. In my team, we do have that requirement (we want each subflow to be run in its own docker container), and we had to write some custom code in order to make that work.
If you can let me know whether you have the same requirement regarding infrastructure, maybe I can point you in the right direction.
d
@Oscar Björhn Thanks for replying! I don't have that requirement, I blieve. I have a docker compose with the one prefect server and a cli server which connects to it for command and control. Other than that, it's pretty straight up
docker compose
I'd love a pointer if you've got something but I am honestly stumped how to make this work as a subflow off the main flow concurrently. Do you have any code examples? Really just need to see how someone did it to make it work, I feel.
s
I've done some brute force parallelization by creating a deployment for the subflow and then calling
run_deployment(..., timeout=0)
on the subflow deployment. The
timeout=0
turns the
run_deployment
into a non-blocking operation.
My subflows are each computationally intensive, so running them concurrently on a single "node" wasn't an option.
o
@DarylSean Davis' solution with run_deployment (with timeout 0) is what I would have recommended in your case. I don't think there's anything more simple built-in, but it's been a while since I had a look at new capabilities
d
@Sean Davis Interesting. I hadn't thought of that. To save me some trial and error, do you have a rough up of how that code looks dividing up the deployment and the normal flow? (also, will that subflow deployment technique work with subflows below the subflow... I have a dad_letter pattern for files that throw serious errors or obvious data qual problems.). \\ @Oscar Björhn
(so for example, are you creatimg a deployment of the subflow as one pythong file with a
.serve
and then just calling it via a second file (run as a deployment) or straight up script?
s
I create the deployment using:
Copy code
subflow.deploy(....)
Then I use the name of the subflow deployment in
run_deployment
in the parent flow.
I have no idea if this is the best way to do things, but it achieves the desired result of parallel flows, each running on its own infrastructure (in my case).
d
@Sean Davis Thanks. This is helpful context. I did not know about
subflow.deploy
so I'll experiment a bit today and see if that manages to sort the issue for me. At this point, even though I like the way Prefect handles (well, basically everything), if I can't get the parallelism issue sorted by end of weekend, I'll need to fall back to Airflow until Prefect handles this more natively with flow.serve/submit etc etc. (Personally, I find it strange this is not way easier to sort in Prefect since I imagine it is a basic use case for most people... ie. take this subflow that has dependent tasks - run a bunch of them at the same time to speed things up.)
@Sean Davis @Oscar Björhn Have either of you checked out this "Work Pools" feature? I imagine the idea is you deploy a subflow to it, and then it concurrently ramps up your resources by... perhaps creating more dockers or such? Been trying to puzzle it out. We have a big meaty on-prem server we can use ofr this processing, but just wondered if that is the "new way" to do this. Kinda the opposite way I wanted to go, since the processing pipeline only takes about 90s (and probs throttled by the api calls I need to make) but curious how other people must be doing this as well.
s
The deployment includes a reference to a work pool. You can think of a work pool as a job queue, but with workers attached to it. When I do a run_deployment, a new subflow is scheduled into the worker pool. Then, based on the worker pool type, the job runs on an available worker or one is created in the case of serverless workers.
d
@Sean Davis Well, I made some progress here. I've got the deployment to, well... deploy... and attached (as a test) a 20 concurrency work pool (Docker based) to try to parallelize the flow. Just kicked the puppy off, but it is still serving the science processing in serial. Just sanity checking what I've done, the basic approach I've taken is this:
Copy code
if __name__ == "__main__":
    atlas_deploy = atlas_ingest.to_deployment(name="atlas_deploy")
    sci_backend = sci_backend_processing.to_deployment(name="sci_backend") 
    serve(atlas_deploy, sci_backend)
    run_deployment(sci_backend, timeout=0)
    run_deployment(atlas_deploy)
Is this the rough approach to take? the ingest take a files listing and then just
for file in files
sends that to the science processing backend but unsure if you need a different approach if it's a deployment. \\ @Oscar Björhn
(feel like I'm getting closer though... at least it's running in serial again... =] ).
Perhaps a better question might be how do I pass the
file in files
to the run_deployment to have it respect the deployment concurrency in the worker queue? (I dropped back to using the default-agent queue and made that concurrency 20 to start testing)?
What I need i the equivalent of task_fn.submit(x) but for flows.... 😕
s
You'd do something like:
Copy code
for file in files:
    run_deployment('sci_backend', timeout=0)
If you have parameters to pass, just add
parameters={ ... }
inside run_deployment().