<@ULVA73B9P> Is there a way to batch retry Failed ...
# marvin-ai
t
@Marvin Is there a way to batch retry Failed flows on the Prefect UI? This is self hosted prefect server
m
thought for 653.9 seconds
Short answer: there isn’t a bulk “retry/rerun all failed” action in the Prefect UI today (including self‑hosted). You can rerun individual flow runs from the UI, but not in batch. Good alternatives: 1) Automate retries going forward - Create an Automation: Trigger = “Flow run entered Failed”, Action = “Run deployment” (optionally scoped by tags/deployment). - Docs/API ref for Automations: Prefect 3 Automations API - Background blog: Automations and observability 2) One-off bulk rerun with a short Python script (server/REST client) The snippet below finds failed runs for a deployment, then creates new runs using the original parameters. Run it from a machine that can reach your self-hosted API and with your Prefect profile pointing at your server.
Copy code
import asyncio
from uuid import UUID

from prefect.client.orchestration import get_client
from prefect.deployments import run_deployment

# Filters
from prefect.server.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterState,
    StateFilter,
    StateTypeFilter,
    DeploymentFilter,
    DeploymentFilterId,
)
from prefect.server.schemas.states import StateType

DEPLOYMENT_NAME = "your-flow-name/your-deployment-name"
MAX_RUNS = 100  # change as needed

async def main():
    async with get_client() as client:
        # Look up the deployment by name to get its ID
        dep = await client.read_deployment_by_name(DEPLOYMENT_NAME)

        # List FAILED flow runs for this deployment
        runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=FlowRunFilterState(
                    type=StateTypeFilter(any_=[StateType.FAILED])
                )
            ),
            deployment_filter=DeploymentFilter(
                id=DeploymentFilterId(any_=[dep.id])
            ),
            limit=MAX_RUNS,
        )

        print(f"Found {len(runs)} failed runs for {DEPLOYMENT_NAME}")

        # Rerun each by creating a new flow run on the same deployment with the same parameters
        for fr in runs:
            await run_deployment(
                name=DEPLOYMENT_NAME,
                parameters=fr.parameters,
                flow_run_name=f"{fr.name}-retry",
            )
            print(f"Re-ran {fr.id} as a new flow run")

asyncio.run(main())
- API references you may find useful: - Flow runs API: Flow runs API (3.x) - run_deployment function: run_deployment 3) CLI + scripting - You can list failed runs with the CLI, then loop over them in a script to create new runs. CLI help: -
prefect flow-run ls --help
-
prefect deployment run --help
- There isn’t a built-in “bulk retry” CLI command, but a small wrapper script can do it. A small nuance: Prefect treats “retry” inside your flow via
retries=
on the flow or tasks. Once a run is in a terminal Failed state, “retry” typically means creating a new run (as shown above) rather than changing the state of the existing run. If you can share how you’d like to target the reruns (specific deployments, tags, or time window), I can tailor the script exactly to your criteria.
t
@Marvin What if I don;t want to create a new flow run but I just want to retry the original one
m
thought for 98.3 seconds
Short answer: In Prefect 3.x, once a flow run has reached a terminal state (Failed, Completed, Cancelled, Crashed), it’s considered immutable and you can’t “retry the same run” from the UI or API in a supported way. The UI’s “Rerun” creates a brand new flow run with the same parameters. Only exception: If the run is Paused/Suspended, you can resume that same run ID using
resume_flow_run
(but this does not apply to Failed runs). - API ref: Resume a flow run Advanced/unsupported: There is a low-level client endpoint to force-change a run’s state (e.g., back to Scheduled). While you might be able to coerce a failed run back to a schedulable state, this is not supported and can yield inconsistent history/lineage. If you must go this route, proceed carefully and test in non-prod. - Flow runs API (Python): Flow runs API Recommended pattern: - Create a new run (via UI “Rerun,” CLI, or script) and optionally tag or annotate it to point back to the original run for traceability. - For the future, use an Automation to trigger “Run deployment” on failures so retries happen automatically: Automations API If you want, I can draft a guarded script that attempts a forced reschedule, but I strongly recommend the supported “create a new run” approach.