Lee Trout
09/20/2025, 12:18 AMMarvin
09/20/2025, 12:18 AMMarvin
09/20/2025, 12:25 AMforce=True
.
- resume_flow_run
is only for paused runs, not crashed runs.
- Task runs cannot be re-executed independently of a flow run; retries should be configured on tasks (e.g., @task(retries=3, retry_delay_seconds=60)
).
Option A — Requeue the same flow run (set it back to SCHEDULED)
This keeps the same flow_run_id
and re-executes from the start. Make sure the deployment/worker is healthy so it can pick up the run.
Async
import asyncio
from datetime import datetime, timezone
from prefect.client.orchestration import get_client
from prefect.states import Scheduled
async def requeue_flow_run(flow_run_id: str):
async with get_client() as client:
state = Scheduled(scheduled_time=datetime.now(timezone.utc))
result = await client.set_flow_run_state(
flow_run_id=flow_run_id,
state=state,
force=True, # required to move from CRASHED/FAILED -> SCHEDULED
)
print(f"Set state result: {result.status}") # e.g. "ACCEPT"
if __name__ == "__main__":
asyncio.run(requeue_flow_run("<your-flow-run-id>"))
Sync
from datetime import datetime, timezone
from prefect.client.orchestration import SyncPrefectClient
from prefect.states import Scheduled
def requeue_flow_run_sync(flow_run_id: str):
with SyncPrefectClient() as client:
state = Scheduled(scheduled_time=datetime.now(timezone.utc))
result = client.set_flow_run_state(
flow_run_id=flow_run_id,
state=state,
force=True,
)
print(f"Set state result: {result.status}")
requeue_flow_run_sync("<your-flow-run-id>")
Option B — Create a new flow run from the same deployment (recommended)
This mirrors the UI “Run again”. It’s usually cleaner and avoids quirks around reusing the same run.
Async
import asyncio
from datetime import datetime, timezone
from prefect.client.orchestration import get_client
from prefect.states import Scheduled
async def rerun_from_same_deployment(flow_run_id: str):
async with get_client() as client:
# Load the original run to find its deployment and parameters
fr = await client.read_flow_run(flow_run_id)
if not fr.deployment_id:
raise RuntimeError("Original flow run has no deployment_id; create a run from a deployment or requeue the same run instead.")
# Use original parameters if available; otherwise pass your own
params = getattr(fr, "parameters", {}) or {}
new_fr = await client.create_flow_run_from_deployment(
fr.deployment_id,
parameters=params,
state=Scheduled(scheduled_time=datetime.now(timezone.utc)),
)
print(f"New flow run created: {new_fr.id}")
if __name__ == "__main__":
asyncio.run(rerun_from_same_deployment("<your-flow-run-id>"))
Sync
```
from datetime import datetime, timezone
from prefect.client.orchestration import SyncPrefectClient
from prefect.states import Scheduled
def rerun_from_same_deployment_sync(flow_run_id: str):
with SyncPrefectClient() as client:
fr = client.read_flow_run(flow_run_id)
if not fr.deployment_id:
raise RuntimeError("Original flow run has no deployment_id; create a run from a deployment or requeue the same run instead.")
params = getattr(fr, "parameters", {}) or {}Marvin
09/20/2025, 12:25 AMAbout task-run retries
- You can’t re-run individual tasks after a crash unless the flow run is running; tasks are executed by the flow engine process, not workers independently.
- Configure task retries in code, e.g.:
from prefect import task
@task(retries=3, retry_delay_seconds=60)
def my_task(...):
...
```
This way, if a task fails during the flow run, Prefect will retry it automatically.
Which option should you use?
- If you want a clean, auditable “new” run (typical operational practice), create a new flow run from the same deployment (Option B).
- If you specifically need to keep the same flow_run_id
(e.g., external systems refer to it), you can force the state back to SCHEDULED
(Option A).
Quick check
- Do you have the flow_run_id
and was the run created from a deployment? If you share that, I can tailor the snippet exactly to your case.