Chris Killpack
09/15/2025, 7:25 PMMarvin
09/15/2025, 7:25 PMMarvin
09/15/2025, 7:29 PMFlowRunimport asyncio
from uuid import UUID
from prefect.flow_runs import wait_for_flow_run
from prefect.client.schemas.objects import StateType
from prefect.exceptions import FlowRunWaitTimeout
async def main(flow_run_id: UUID):
    try:
        flow_run = await wait_for_flow_run(
            flow_run_id,
            poll_interval=2,  # seconds
            timeout=60 * 30,  # 30 minutes
            log_states=True,  # optional: logs transitions
        )
    except FlowRunWaitTimeout:
        print("Timed out waiting for the flow run to finish.")
        return
    final_state = flow_run.state
    print("Final state:", final_state.type, final_state.name)
    if final_state.type == StateType.COMPLETED:
        print("Success!")
    elif final_state.type in {StateType.FAILED, StateType.CANCELLED, StateType.CRASHED}:
        print("Flow run ended in a failure-like terminal state.")
if __name__ == "__main__":
    asyncio.run(main(UUID("YOUR-FLOW-RUN-ID-HERE")))import asyncio
from uuid import UUID
from prefect import get_client
from prefect.client.schemas.objects import StateType
TERMINAL = {StateType.COMPLETED, StateType.FAILED, StateType.CANCELLED, StateType.CRASHED}
async def poll_flow_run(flow_run_id: UUID, interval: float = 2.0):
    async with get_client() as client:
        while True:
            flow_run = await client.read_flow_run(flow_run_id)
            state = flow_run.state
            print(f"State: {state.type} ({state.name})")
            if state.type in TERMINAL:
                print("Reached terminal state.")
                return flow_run
            # Optional: check if running explicitly
            if state.type == StateType.RUNNING:
                print("Still running...")
            await asyncio.sleep(interval)
if __name__ == "__main__":
    asyncio.run(poll_flow_run(UUID("YOUR-FLOW-RUN-ID-HERE")))wait_for_flow_runNate
09/15/2025, 7:30 PMrun_deploymentChris Killpack
09/15/2025, 7:46 PMNate
09/15/2025, 7:52 PMimport asyncio
    from uuid import uuid4
    from prefect import flow
    from prefect.flow_engine import run_flow_async
    from prefect.flow_runs import FlowRunWaiter
    @flow
    async def test_flow():
        await asyncio.sleep(5)
        print("Done!")
    async def main():
        flow_run_id = uuid4()
        asyncio.create_flow(run_flow_async(flow=test_flow, flow_run_id=flow_run_id))
        await FlowRunWaiter.wait_for_flow_run(flow_run_id)
        print("Flow run finished")
    if __name__ == "__main__":
        asyncio.run(main())await FlowRunWaiter.wait_for_flow_run(flow_run_id)Nate
09/15/2025, 7:55 PMChris Killpack
09/15/2025, 9:44 PM