<@ULVA73B9P> I have a bunch of flow runs, that hav...
# ask-marvin
m
@Marvin I have a bunch of flow runs, that have crashed status. How do i automatically retry all of them again. prefect3
m
thought for 322.3 seconds
Short answer: there isn’t a built-in “bulk rerun” button/CLI in Prefect 3. You’ll create new runs for those CRASHED flow runs (you can’t flip a CRASHED run back to RETRYING). The most reliable way is a small script with the Prefect client that finds CRASHED runs and starts new runs from their deployments using the same parameters. Quick sanity check via CLI (optional): - See what’s CRASHED:
Copy code
prefect flow-run ls --state CRASHED --limit 50
Programmatic bulk rerun (Sync client; copy/paste runnable):
Copy code
from uuid import UUID
from prefect.client.orchestration import SyncPrefectClient
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateName

def rerun_all_crashed(limit_per_page: int = 200, max_runs: int | None = None):
    created = []
    skipped = []
    offset = 0

    with SyncPrefectClient() as client:
        fr_filter = FlowRunFilter(
            state=FlowRunFilterState(
                name=FlowRunFilterStateName(any_=["CRASHED"])
            )
        )

        while True:
            batch = client.read_flow_runs(
                flow_run_filter=fr_filter,
                limit=limit_per_page,
                offset=offset,
            )
            if not batch:
                break

            for fr in batch:
                # Only rerun if this came from a deployment
                if not getattr(fr, "deployment_id", None):
                    skipped.append((fr.id, "no deployment_id on original run"))
                    continue

                params = (getattr(fr, "parameters", None) or {}).copy()
                tags = list(getattr(fr, "tags", []) or [])
                name = f"{(fr.name or str(fr.id))}-retry"

                new_fr = client.create_flow_run_from_deployment(
                    fr.deployment_id,
                    parameters=params,
                    name=name,
                    tags=tags,
                )
                created.append((fr.id, new_fr.id))

                if max_runs and len(created) >= max_runs:
                    return created, skipped

            offset += len(batch)

    return created, skipped

if __name__ == "__main__":
    created, skipped = rerun_all_crashed()
    print(f"Created {len(created)} reruns; skipped {len(skipped)}")
    if skipped:
        print("Skipped details:", skipped)
Notes: - This creates new flow runs for each CRASHED run, copying parameters and tags. The new run IDs will be different. - Runs that were not started from a deployment (no
deployment_id
) are skipped. If you have those, we can discuss options. - You can narrow the selection (e.g., by a specific deployment) by adding a deployment filter to
FlowRunFilter
if you’d like. Relevant docs: - Create a run from a deployment: Create flow run from deployment - Read/list flow runs: Flow runs API If you prefer async, I can share that version too. Do you want to target all CRASHED runs in the workspace, or only specific flows/deployments?
m
CLI found a bunch of crashed, but script didnt
@Marvin CLI found a bunch of crashed, but script didnt
m
thought for 123.2 seconds
Got it — if the CLI shows CRASHED runs but your script doesn’t, it’s almost always one of these: - Different Prefect profile/workspace/API URL between your shell and the script - Using a state “name” filter instead of state “type” (names can vary; types are stable) - Pagination or filter mismatch in the script Let’s quickly verify the environment your script is hitting and switch to filtering by state type. 1) Check the CLI profile/workspace
Copy code
prefect config view
prefect version
2) Print the script’s settings to confirm it matches the CLI
Copy code
from prefect.settings import get_current_settings, load_current_profile
s = get_current_settings()
print("Profile:", load_current_profile().name)
print("API URL:", s.PREFECT_API_URL.value())
print("UI URL:", s.PREFECT_UI_URL.value())
print("API Key set:", bool(s.PREFECT_API_KEY.value()))
3) Minimal check: count CRASHED by state TYPE (not name)
Copy code
from prefect.client.orchestration import SyncPrefectClient
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateType

with SyncPrefectClient() as client:
    runs = client.read_flow_runs(
        flow_run_filter=FlowRunFilter(
            state=FlowRunFilterState(
                type=FlowRunFilterStateType(any_=["CRASHED"])
            )
        ),
        limit=50,
    )
    print("Found crashed:", len(runs))
    for r in runs[:5]:
        state_name = getattr(getattr(r, "state", None), "name", None)
        print(r.id, state_name, getattr(r, "deployment_id", None))
If that still prints 0, grab one flow run ID the CLI showed and try to read it directly; if this returns None/404 you’re pointed at a different workspace:
Copy code
# replace with an ID from `prefect flow-run ls --state CRASHED`
target_id = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"

from uuid import UUID
from prefect.client.orchestration import SyncPrefectClient

with SyncPrefectClient() as client:
    fr = client.read_flow_run(UUID(target_id))
    print(fr)
Updated bulk rerun script (uses state TYPE filter and is a bit more defensive):
Copy code
from prefect.client.orchestration import SyncPrefectClient
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateType

def rerun_all_crashed(limit_per_page: int = 200, max_runs: int | None = None):
    created, skipped = [], []
    offset = 0

    with SyncPrefectClient() as client:
        fr_filter = FlowRunFilter(
            state=FlowRunFilterState(
                type=FlowRunFilterStateType(any_=["CRASHED"])
            )
        )

        while True:
            batch = client.read_flow_runs(
                flow_run_filter=fr_filter,
                limit=limit_per_page,
                offset=offset,
            )
            if not batch:
                break

            for fr in batch:
                if not getattr(fr, "deployment_id", None):
                    skipped.append((fr.id, "no deployment_id on original run"))
                    continue

                params = (getattr(fr, "parameters", None) or {}).copy()
                tags = list(getattr(fr, "tags", []) or [])
                name = f"{(fr.name or str(fr.id))}-retry"

                new_fr = client.create_flow_run_from_deployment(
                    fr.deployment_id,
                    parameters=params,
                    name=name,
                    tags=tags,
                )
                created.append((fr.id, new_fr.id))

                if max_runs and len(created) >= max_runs:
                    return created, skipped

            offset += len(batch)

    return created, skipped

if __name__ == "__main__":
    created, skipped = rerun_all_crashed()
    print(f"Created {len(created)} reruns; skipped {len(skipped)}")
    if skipped:
        print("Skipped details:", skipped)
If you share: - The output of
prefect config view
and
prefect version
from the same environment as the script - One example CRASHED flow run ID the CLI shows
…I can pinpoint whether it’s an environment mismatch or filter issue and get you to green quickly. Refs: - Flow runs API: Flow runs - Create run from deployment: Create flow run from deployment