Momo Chimes
10/06/2025, 2:48 PMMarvin
10/06/2025, 2:48 PMMarvin
10/06/2025, 2:54 PMprefect flow-run ls --state CRASHED --limit 50
Programmatic bulk rerun (Sync client; copy/paste runnable):
from uuid import UUID
from prefect.client.orchestration import SyncPrefectClient
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateName
def rerun_all_crashed(limit_per_page: int = 200, max_runs: int | None = None):
created = []
skipped = []
offset = 0
with SyncPrefectClient() as client:
fr_filter = FlowRunFilter(
state=FlowRunFilterState(
name=FlowRunFilterStateName(any_=["CRASHED"])
)
)
while True:
batch = client.read_flow_runs(
flow_run_filter=fr_filter,
limit=limit_per_page,
offset=offset,
)
if not batch:
break
for fr in batch:
# Only rerun if this came from a deployment
if not getattr(fr, "deployment_id", None):
skipped.append((fr.id, "no deployment_id on original run"))
continue
params = (getattr(fr, "parameters", None) or {}).copy()
tags = list(getattr(fr, "tags", []) or [])
name = f"{(fr.name or str(fr.id))}-retry"
new_fr = client.create_flow_run_from_deployment(
fr.deployment_id,
parameters=params,
name=name,
tags=tags,
)
created.append((fr.id, new_fr.id))
if max_runs and len(created) >= max_runs:
return created, skipped
offset += len(batch)
return created, skipped
if __name__ == "__main__":
created, skipped = rerun_all_crashed()
print(f"Created {len(created)} reruns; skipped {len(skipped)}")
if skipped:
print("Skipped details:", skipped)
Notes:
- This creates new flow runs for each CRASHED run, copying parameters and tags. The new run IDs will be different.
- Runs that were not started from a deployment (no deployment_id) are skipped. If you have those, we can discuss options.
- You can narrow the selection (e.g., by a specific deployment) by adding a deployment filter to FlowRunFilter if you’d like.
Relevant docs:
- Create a run from a deployment: Create flow run from deployment
- Read/list flow runs: Flow runs API
If you prefer async, I can share that version too. Do you want to target all CRASHED runs in the workspace, or only specific flows/deployments?Momo Chimes
10/06/2025, 3:05 PMMomo Chimes
10/06/2025, 3:05 PMMarvin
10/06/2025, 3:05 PMMarvin
10/06/2025, 3:07 PMprefect config view
prefect version
2) Print the script’s settings to confirm it matches the CLI
from prefect.settings import get_current_settings, load_current_profile
s = get_current_settings()
print("Profile:", load_current_profile().name)
print("API URL:", s.PREFECT_API_URL.value())
print("UI URL:", s.PREFECT_UI_URL.value())
print("API Key set:", bool(s.PREFECT_API_KEY.value()))
3) Minimal check: count CRASHED by state TYPE (not name)
from prefect.client.orchestration import SyncPrefectClient
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateType
with SyncPrefectClient() as client:
runs = client.read_flow_runs(
flow_run_filter=FlowRunFilter(
state=FlowRunFilterState(
type=FlowRunFilterStateType(any_=["CRASHED"])
)
),
limit=50,
)
print("Found crashed:", len(runs))
for r in runs[:5]:
state_name = getattr(getattr(r, "state", None), "name", None)
print(r.id, state_name, getattr(r, "deployment_id", None))
If that still prints 0, grab one flow run ID the CLI showed and try to read it directly; if this returns None/404 you’re pointed at a different workspace:
# replace with an ID from `prefect flow-run ls --state CRASHED`
target_id = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
from uuid import UUID
from prefect.client.orchestration import SyncPrefectClient
with SyncPrefectClient() as client:
fr = client.read_flow_run(UUID(target_id))
print(fr)
Updated bulk rerun script (uses state TYPE filter and is a bit more defensive):
from prefect.client.orchestration import SyncPrefectClient
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateType
def rerun_all_crashed(limit_per_page: int = 200, max_runs: int | None = None):
created, skipped = [], []
offset = 0
with SyncPrefectClient() as client:
fr_filter = FlowRunFilter(
state=FlowRunFilterState(
type=FlowRunFilterStateType(any_=["CRASHED"])
)
)
while True:
batch = client.read_flow_runs(
flow_run_filter=fr_filter,
limit=limit_per_page,
offset=offset,
)
if not batch:
break
for fr in batch:
if not getattr(fr, "deployment_id", None):
skipped.append((fr.id, "no deployment_id on original run"))
continue
params = (getattr(fr, "parameters", None) or {}).copy()
tags = list(getattr(fr, "tags", []) or [])
name = f"{(fr.name or str(fr.id))}-retry"
new_fr = client.create_flow_run_from_deployment(
fr.deployment_id,
parameters=params,
name=name,
tags=tags,
)
created.append((fr.id, new_fr.id))
if max_runs and len(created) >= max_runs:
return created, skipped
offset += len(batch)
return created, skipped
if __name__ == "__main__":
created, skipped = rerun_all_crashed()
print(f"Created {len(created)} reruns; skipped {len(skipped)}")
if skipped:
print("Skipped details:", skipped)
If you share:
- The output of prefect config view and prefect version from the same environment as the script
- One example CRASHED flow run ID the CLI showsMarvin
10/06/2025, 3:07 PM