<@ULVA73B9P> How do I perform a fork and join with...
# marvin-in-the-wild
m
@Marvin How do I perform a fork and join with async run_deployment("flow/deployment", timeout=0)? I need the results of all the flows.
m
To perform a fork and join with asynchronous flows and collect results from all of them, you likely need to create a flow that triggers other flows (the "fork" part) and then waits for all of them to complete before aggregating the results (the "join" part). You can run asynchronous flows with the
run_deployment
command, by calling it with
timeout=0
. This makes the runs non-blocking. To wait for these runs to complete and collect their results, you can use 'async' and 'await' in Prefect 2.x. You may submit each flow run and then await the completion of their states. Once all the flows are complete, you can aggregate their results. For specific examples of running asynchronous tasks and dealing with the results, you can refer to the documentation on upstream dependencies where
.submit()
is used to run tasks asynchronously and
wait_for
keyword is used to orchestrate the dependencies effectively: Specifying Upstream Dependencies. Additionally, the documentation on handling results asynchronously, with the example using
await state.result(fetch=True)
to fetch the results of async tasks, might be particularly useful: Working with Flow and Task Results. Although you asked about
run_deployment
, the provided documentation links do not directly refer to it in the context of asynchronous execution. For a thorough example related to
run_deployment
and handling multiple flows asynchronously, it might be necessary to search further in the documentation or GitHub issues. For more detailed instructions, examining the documentation or concrete code samples would be beneficial. If you require additional information, please let me know, and I'll continue to search for more relevant examples or guidance.
m
How would I wait for the state of multiple
run_deployment("flow/deployment", timeout=0)
when they return a FlowRun object immediately
n
if you want to wait for them, i wouldnt use
timeout=0
since that's how you avoid waiting for them
i would just
await asyncio.gather(*[run_deployment(...) for _ in whatever])
m
Thank you