<@ULVA73B9P>, how can I programmatically trigger a...
# marvin-ai
r
@Marvin, how can I programmatically trigger a new flow run when I set the flow run state to Failed inside a try/except block?
m
thought for 266.1 seconds
Short answer: trigger a new run by calling
prefect.deployments.run_deployment(...)
inside your
except
block, then re-raise to fail the current run. This is the simplest and most reliable way in Prefect 3.x. Example pattern:
Copy code
from prefect import flow, get_run_logger
from prefect.deployments import run_deployment

@flow(retries=0)  # prefer built-in retries if you just want a retry
def my_flow():
    logger = get_run_logger()
    try:
        # your logic that may fail
        1 / 0
    except Exception as exc:
        logger.error(f"Flow logic failed: {exc}")

        # Kick off a new run of a deployment (this can be the same flow/deployment)
        # Replace "my-flow/my-deployment" with your actual "<flow-name>/<deployment-name>"
        new_run = run_deployment(
            name="my-flow/my-deployment",
            parameters={"foo": "bar"},  # optional
            flow_run_name="retry-from-code",  # optional
            as_subflow=False,  # set False to create an independent run
            idempotency_key=None,  # optionally set to avoid dupes
        )
        <http://logger.info|logger.info>(f"Triggered new run: {new_run.id}")

        # Mark THIS run as failed by re-raising
        raise
Notes and options: - Prefer built-in flow retries if your goal is to retry the same run instead of starting a brand-new run:
Copy code
@flow(retries=3, retry_delay_seconds=30)
  def my_flow():
      ...
This will move the run into AwaitingRetry automatically on failure. -
run_deployment
will create a new flow run for the target deployment and returns the created flow run metadata. By default it may be tracked as a subflow; set
as_subflow=False
to keep it separate so the new run isn’t cancelled when the current run fails. - If you want to wait for the triggered run and inspect its outcome, you can optionally do:
Copy code
from prefect.flow_runs import wait_for_flow_run

  state = wait_for_flow_run(new_run.id, raise_on_failure=False)
- You can also handle this without code by using an Automation: trigger “Run deployment” when a “Flow run Failed” event occurs for your flow/deployment. This is great for decoupling (no code changes) and avoids accidental infinite loops. - See Automations: Automations docs References: -
run_deployment
API: prefect.deployments.run_deployment - Flow decorator (retries, hooks): prefect.flow Quick question to tailor advice: Do you want a brand-new run of the same deployment on failure, or would built-in retries on the same run meet your needs?