<@ULVA73B9P> how do i use the python client and ap...
# ask-marvin
l
@Marvin how do i use the python client and api to retry a crashed run
m
thought for 440.1 seconds
Great question. In Prefect 3.x, there are two practical ways to “retry” a crashed flow run with the Python client: 1) Requeue the same flow run by forcing its state back to SCHEDULED (workers will pick it up again) 2) Create a brand new flow run from the same deployment (recommended, and what the UI’s “Run again” does) Notes up front - Workers poll for flow runs in the SCHEDULED state whose scheduled time is due. Setting a run to PENDING will not be picked up by workers; use SCHEDULED with a scheduled_time of “now”. - Forcing state transitions from terminal states (e.g., CRASHED/FAILED) requires
force=True
. -
resume_flow_run
is only for paused runs, not crashed runs. - Task runs cannot be re-executed independently of a flow run; retries should be configured on tasks (e.g.,
@task(retries=3, retry_delay_seconds=60)
). Option A — Requeue the same flow run (set it back to SCHEDULED) This keeps the same
flow_run_id
and re-executes from the start. Make sure the deployment/worker is healthy so it can pick up the run. Async
Copy code
import asyncio
from datetime import datetime, timezone

from prefect.client.orchestration import get_client
from prefect.states import Scheduled

async def requeue_flow_run(flow_run_id: str):
    async with get_client() as client:
        state = Scheduled(scheduled_time=datetime.now(timezone.utc))
        result = await client.set_flow_run_state(
            flow_run_id=flow_run_id,
            state=state,
            force=True,  # required to move from CRASHED/FAILED -> SCHEDULED
        )
        print(f"Set state result: {result.status}")  # e.g. "ACCEPT"

if __name__ == "__main__":
    asyncio.run(requeue_flow_run("<your-flow-run-id>"))
Sync
Copy code
from datetime import datetime, timezone

from prefect.client.orchestration import SyncPrefectClient
from prefect.states import Scheduled

def requeue_flow_run_sync(flow_run_id: str):
    with SyncPrefectClient() as client:
        state = Scheduled(scheduled_time=datetime.now(timezone.utc))
        result = client.set_flow_run_state(
            flow_run_id=flow_run_id,
            state=state,
            force=True,
        )
        print(f"Set state result: {result.status}")

requeue_flow_run_sync("<your-flow-run-id>")
Option B — Create a new flow run from the same deployment (recommended) This mirrors the UI “Run again”. It’s usually cleaner and avoids quirks around reusing the same run. Async
Copy code
import asyncio
from datetime import datetime, timezone

from prefect.client.orchestration import get_client
from prefect.states import Scheduled

async def rerun_from_same_deployment(flow_run_id: str):
    async with get_client() as client:
        # Load the original run to find its deployment and parameters
        fr = await client.read_flow_run(flow_run_id)
        if not fr.deployment_id:
            raise RuntimeError("Original flow run has no deployment_id; create a run from a deployment or requeue the same run instead.")

        # Use original parameters if available; otherwise pass your own
        params = getattr(fr, "parameters", {}) or {}

        new_fr = await client.create_flow_run_from_deployment(
            fr.deployment_id,
            parameters=params,
            state=Scheduled(scheduled_time=datetime.now(timezone.utc)),
        )
        print(f"New flow run created: {new_fr.id}")

if __name__ == "__main__":
    asyncio.run(rerun_from_same_deployment("<your-flow-run-id>"))
Sync ``` from datetime import datetime, timezone from prefect.client.orchestration import SyncPrefectClient from prefect.states import Scheduled def rerun_from_same_deployment_sync(flow_run_id: str): with SyncPrefectClient() as client: fr = client.read_flow_run(flow_run_id) if not fr.deployment_id: raise RuntimeError("Original flow run has no deployment_id; create a run from a deployment or requeue the same run instead.") params = getattr(fr, "parameters", {}) or {}
new_fr = client.create_flow_run_from_deployment( fr.deployment_id, parameters=params, state=Scheduled(scheduled_time=datetime.now(timezone.utc)), ) print(f"New flow run created: {new_fr.id}") rerun_from_same_deployment_sync("<your-flow-run-id>")
Copy code
About task-run retries
- You can’t re-run individual tasks after a crash unless the flow run is running; tasks are executed by the flow engine process, not workers independently.
- Configure task retries in code, e.g.:
from prefect import task @task(retries=3, retry_delay_seconds=60) def my_task(...): ... ``` This way, if a task fails during the flow run, Prefect will retry it automatically. Which option should you use? - If you want a clean, auditable “new” run (typical operational practice), create a new flow run from the same deployment (Option B). - If you specifically need to keep the same
flow_run_id
(e.g., external systems refer to it), you can force the state back to
SCHEDULED
(Option A). Quick check - Do you have the
flow_run_id
and was the run created from a deployment? If you share that, I can tailor the snippet exactly to your case.