Chris Killpack
09/15/2025, 7:25 PMMarvin
09/15/2025, 7:25 PMMarvin
09/15/2025, 7:29 PMFlowRun
.
import asyncio
from uuid import UUID
from prefect.flow_runs import wait_for_flow_run
from prefect.client.schemas.objects import StateType
from prefect.exceptions import FlowRunWaitTimeout
async def main(flow_run_id: UUID):
try:
flow_run = await wait_for_flow_run(
flow_run_id,
poll_interval=2, # seconds
timeout=60 * 30, # 30 minutes
log_states=True, # optional: logs transitions
)
except FlowRunWaitTimeout:
print("Timed out waiting for the flow run to finish.")
return
final_state = flow_run.state
print("Final state:", final_state.type, final_state.name)
if final_state.type == StateType.COMPLETED:
print("Success!")
elif final_state.type in {StateType.FAILED, StateType.CANCELLED, StateType.CRASHED}:
print("Flow run ended in a failure-like terminal state.")
if __name__ == "__main__":
asyncio.run(main(UUID("YOUR-FLOW-RUN-ID-HERE")))
- Source for `wait_for_flow_run`: prefect/flow_runs.py
2) Manually poll via the client
If you want to control the polling loop yourself:
import asyncio
from uuid import UUID
from prefect import get_client
from prefect.client.schemas.objects import StateType
TERMINAL = {StateType.COMPLETED, StateType.FAILED, StateType.CANCELLED, StateType.CRASHED}
async def poll_flow_run(flow_run_id: UUID, interval: float = 2.0):
async with get_client() as client:
while True:
flow_run = await client.read_flow_run(flow_run_id)
state = flow_run.state
print(f"State: {state.type} ({state.name})")
if state.type in TERMINAL:
print("Reached terminal state.")
return flow_run
# Optional: check if running explicitly
if state.type == StateType.RUNNING:
print("Still running...")
await asyncio.sleep(interval)
if __name__ == "__main__":
asyncio.run(poll_flow_run(UUID("YOUR-FLOW-RUN-ID-HERE")))
Useful references
- REST API reference for flow run state (helps understand terminal vs non-terminal): Read flow run state (REST)
- wait_for_flow_run
source (shows timeout, poll_interval, logging, return type): prefect/flow_runs.py
If you share your exact use case (e.g., needing a sync wrapper, custom logging, or early-exit conditions), I can tailor the snippet.Nate
09/15/2025, 7:30 PMrun_deployment
waits for you by default, which we could potentially make more clear in docsChris Killpack
09/15/2025, 7:46 PMNate
09/15/2025, 7:52 PMimport asyncio
from uuid import uuid4
from prefect import flow
from prefect.flow_engine import run_flow_async
from prefect.flow_runs import FlowRunWaiter
@flow
async def test_flow():
await asyncio.sleep(5)
print("Done!")
async def main():
flow_run_id = uuid4()
asyncio.create_flow(run_flow_async(flow=test_flow, flow_run_id=flow_run_id))
await FlowRunWaiter.wait_for_flow_run(flow_run_id)
print("Flow run finished")
if __name__ == "__main__":
asyncio.run(main())
this starts a websocket client and blocks until the run reaches a terminal state (ie no need to poll)
await FlowRunWaiter.wait_for_flow_run(flow_run_id)
Nate
09/15/2025, 7:55 PMChris Killpack
09/15/2025, 9:44 PM