Sam Thomas
10/27/2025, 11:27 AM@task
async def launch_and_wait(x):
runs = await asyncio.gather(
run_deployment(name="flow-a/dep-a", parameters={"p": x}, timeout=0),
run_deployment(name="flow-b/dep-b", parameters={"p": x}, timeout=0),
run_deployment(name="flow-c/dep-c", parameters={"p": x}, timeout=0),
)
await asyncio.sleep(1)
finished_runs = asyncio.gather(*[wait_for_flow_run(i.id) for i in runs])
if any([i.state is None or i.state.type != StateType.COMPLETED for i in finished_runs]):
raise ValueError("Not all sourcing flows successfully completed")
This only works with the await asyncio.sleep() but that feels like bad practice. Why does this process hang without the .sleep(1)?Marvin
10/27/2025, 11:27 AMMarvin
10/27/2025, 11:31 AMrun_deployment is sync-compatible and will wait for completion by default. Passing timeout=0 is the right way to “fire-and-return” immediately with a FlowRun so you can wait concurrently yourself.
- wait_for_flow_run is async and returns the final FlowRun (with its terminal state).
- In your snippet, you forgot to await the second gather — you’re checking a Future instead of the finished runs.
- Prefer checking state.is_completed() instead of comparing to StateType.COMPLETED.
Here’s a clean pattern you can drop in:
import asyncio
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
@flow
async def launch_and_wait(x: int):
# Trigger child runs; timeout=0 returns immediately (no waiting)
children = await asyncio.gather(
run_deployment(name="flow-a/dep-a", parameters={"p": x}, timeout=0),
run_deployment(name="flow-b/dep-b", parameters={"p": x}, timeout=0),
run_deployment(name="flow-c/dep-c", parameters={"p": x}, timeout=0),
)
# Wait for all children to finish concurrently
finished = await asyncio.gather(
*(wait_for_flow_run(r.id, log_states=True) for r in children)
)
# Fail the parent if any child did not complete successfully
if not all(fr.state and fr.state.is_completed() for fr in finished):
raise ValueError("Not all sourcing flows successfully completed")
Notes and options:
- If you want the parent to wait inline for a single deployment, just call await run_deployment(..., timeout=<seconds>) without using wait_for_flow_run.
- If you want a global timeout for all children, wrap the second gather with asyncio.wait_for(...).
- By default, run_deployment links runs as subflows (as_subflow=True) so you’ll see the hierarchy in the UI.
Docs for reference:
- Run deployments (how-to)
- run_deployment API
- wait_for_flow_run API
- States overview
Want the parent to fail fast and cancel the other children if any one fails? If so, I can share a variant that cancels outstanding runs as soon as the first failure is observed.Sam Thomas
10/27/2025, 11:35 AMawait asyncio.gather(
*(wait_for_flow_run(r.id, log_states=True) for r in children)
)
never ends even though the child flows have completed. This does work if there is just one flow. Is there an issue with async and webhooks that prevents this working?