Daryl
01/26/2024, 9:03 AMflow.submit
feature request but I can't find docs or a solid answer on where this functionality stands.
In sum, trying to take a list of files I generate from a directory read and then submit that to another flow for processing in tasked sequence up to the max concurrency limit (to replace what I have in Airflow).
I want the second flow/subflow to be the one that runs in parallel (or minimally, concurrently).
If this is possible in Prefect (without async). Can someone point me a the docs or a code example? It is tricky to tell with Prefect 2.0 if this works with flow.submit
and a subflow.Oscar Björhn
01/26/2024, 10:57 AMOscar Björhn
01/26/2024, 10:57 AMDaryl
01/26/2024, 1:45 PMdocker compose
I'd love a pointer if you've got something but I am honestly stumped how to make this work as a subflow off the main flow concurrently.
Do you have any code examples? Really just need to see how someone did it to make it work, I feel.Sean Davis
01/26/2024, 1:56 PMrun_deployment(..., timeout=0)
on the subflow deployment. The timeout=0
turns the run_deployment
into a non-blocking operation.Sean Davis
01/26/2024, 1:57 PMOscar Björhn
01/26/2024, 3:16 PMDaryl
01/26/2024, 4:18 PMDaryl
01/26/2024, 4:41 PM.serve
and then just calling it via a second file (run as a deployment) or straight up script?Sean Davis
01/26/2024, 5:48 PMsubflow.deploy(....)
Then I use the name of the subflow deployment in run_deployment
in the parent flow.Sean Davis
01/26/2024, 5:49 PMDaryl
01/27/2024, 2:43 AMsubflow.deploy
so I'll experiment a bit today and see if that manages to sort the issue for me.
At this point, even though I like the way Prefect handles (well, basically everything), if I can't get the parallelism issue sorted by end of weekend, I'll need to fall back to Airflow until Prefect handles this more natively with flow.serve/submit etc etc.
(Personally, I find it strange this is not way easier to sort in Prefect since I imagine it is a basic use case for most people... ie. take this subflow that has dependent tasks - run a bunch of them at the same time to speed things up.)Daryl
01/27/2024, 2:50 AMSean Davis
01/27/2024, 4:07 AMDaryl
01/27/2024, 7:42 AMif __name__ == "__main__":
atlas_deploy = atlas_ingest.to_deployment(name="atlas_deploy")
sci_backend = sci_backend_processing.to_deployment(name="sci_backend")
serve(atlas_deploy, sci_backend)
run_deployment(sci_backend, timeout=0)
run_deployment(atlas_deploy)
Is this the rough approach to take? the ingest take a files listing and then just for file in files
sends that to the science processing backend but unsure if you need a different approach if it's a deployment.
\\ @Oscar BjörhnDaryl
01/27/2024, 7:45 AMDaryl
01/27/2024, 9:42 AMfile in files
to the run_deployment to have it respect the deployment concurrency in the worker queue? (I dropped back to using the default-agent queue and made that concurrency 20 to start testing)?Daryl
01/27/2024, 10:24 AMSean Davis
01/27/2024, 2:49 PMfor file in files:
run_deployment('sci_backend', timeout=0)
If you have parameters to pass, just add parameters={ ... }
inside run_deployment().