<@ULVA73B9P> I want to write a flow that triggers ...
# marvin-ai
s
@Marvin I want to write a flow that triggers a number of other flow runs then waits for them to be completed. Currently I am using code like
Copy code
@task
async def launch_and_wait(x):
    runs = await asyncio.gather(
        run_deployment(name="flow-a/dep-a", parameters={"p": x}, timeout=0),
        run_deployment(name="flow-b/dep-b", parameters={"p": x}, timeout=0),
        run_deployment(name="flow-c/dep-c", parameters={"p": x}, timeout=0),
    )
    await asyncio.sleep(1)
    finished_runs = asyncio.gather(*[wait_for_flow_run(i.id) for i in runs])
    if any([i.state is None or i.state.type != StateType.COMPLETED for i in finished_runs]):
        raise ValueError("Not all sourcing flows successfully completed")
This only works with the
await asyncio.sleep()
but that feels like bad practice. Why does this process hang without the .sleep(1)?
m
thought for 226.6 seconds
You’re very close! A few notes for Prefect 3.x: -
run_deployment
is sync-compatible and will wait for completion by default. Passing
timeout=0
is the right way to “fire-and-return” immediately with a
FlowRun
so you can wait concurrently yourself. -
wait_for_flow_run
is async and returns the final
FlowRun
(with its terminal state). - In your snippet, you forgot to await the second
gather
— you’re checking a Future instead of the finished runs. - Prefer checking
state.is_completed()
instead of comparing to
StateType.COMPLETED
. Here’s a clean pattern you can drop in:
Copy code
import asyncio
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run

@flow
async def launch_and_wait(x: int):
    # Trigger child runs; timeout=0 returns immediately (no waiting)
    children = await asyncio.gather(
        run_deployment(name="flow-a/dep-a", parameters={"p": x}, timeout=0),
        run_deployment(name="flow-b/dep-b", parameters={"p": x}, timeout=0),
        run_deployment(name="flow-c/dep-c", parameters={"p": x}, timeout=0),
    )

    # Wait for all children to finish concurrently
    finished = await asyncio.gather(
        *(wait_for_flow_run(r.id, log_states=True) for r in children)
    )

    # Fail the parent if any child did not complete successfully
    if not all(fr.state and fr.state.is_completed() for fr in finished):
        raise ValueError("Not all sourcing flows successfully completed")
Notes and options: - If you want the parent to wait inline for a single deployment, just call
await run_deployment(..., timeout=<seconds>)
without using
wait_for_flow_run
. - If you want a global timeout for all children, wrap the second
gather
with
asyncio.wait_for(...)
. - By default,
run_deployment
links runs as subflows (
as_subflow=True
) so you’ll see the hierarchy in the UI. Docs for reference: - Run deployments (how-to) - run_deployment API - wait_for_flow_run API - States overview Want the parent to fail fast and cancel the other children if any one fails? If so, I can share a variant that cancels outstanding runs as soon as the first failure is observed.
s
When I run this
Copy code
await asyncio.gather(
        *(wait_for_flow_run(r.id, log_states=True) for r in children)
    )
never ends even though the child flows have completed. This does work if there is just one flow. Is there an issue with async and webhooks that prevents this working?