<@ULVA73B9P> I saw some examples of making the `fl...
# ask-marvin
i
@Marvin I saw some examples of making the
flow
decorator consume arguments from the parameters, could this be used to specify flow retry logic as part of an argument I am talking about something like:
Copy code
@flow(retries=<taken from below>)
def my_flow(retries=5):... # retries is passed above
m
thought for 300.2 seconds
thought for 847.5 seconds
Short answer: not directly. In Python (and in Prefect 3.x), decorator arguments like
@flow(retries=...)
are evaluated at import time, so they can’t depend on the flow’s runtime parameters for that same run. Good news: you can still set flow retries dynamically per run using one of these patterns: - Per-call overrides with
with_options
Copy code
from prefect import flow

  @flow
  def my_flow(x: int):
      ...

  # Choose retries at call time (e.g., based on some input/param)
  retries = 3
  delay = 10
  my_flow.with_options(retries=retries, retry_delay_seconds=delay)(x=1)
- Wrapper flow that takes parameters and applies them to the inner flow
Copy code
from prefect import flow

  @flow
  def inner_flow(data: str):
      ...

  @flow
  def driver_flow(retries: int = 0, delay: int = 0, data: str = "hi"):
      # Set the inner flow’s retry policy based on the wrapper’s params
      return inner_flow.with_options(
          retries=retries, retry_delay_seconds=delay
      )(data=data)
Notes and gotchas: - Changing retries via runtime parameters cannot affect the current flow’s already-scheduled retry policy. Use
with_options
on the flow you call. - The same
with_options
pattern works for tasks as well, and tasks have additional knobs like
retry_delay_seconds
accepting a list or callable. - If you’re triggering via a Deployment/UI/API, there isn’t a built-in parameter override for retries. Options: - Bake different retry policies into different deployments of the same flow, or - Use the wrapper-flow pattern above so you can pass retries as parameters to the wrapper. References: - Prefect retries guide: Retries in Prefect 3 - Flow decorator and options (source): prefect/flows.py If you can share how you’re triggering runs (Python, Deployment, UI, automations), I can tailor the example to your setup.
thought for 12.8 seconds
Glad that helps! If you want, I can tailor a quick example for your setup—are you triggering runs via: - a Python script (direct calls), - a Deployment (CLI/UI/API), - or a wrapper/driver flow? I can show the cleanest way to apply
with_options
or set up multiple deployments with different retry policies for your case.
i
@Marvin that's reasonable I have a complex setup that works well for my projects. I load flow deployment descriptions (params, cron, ..) from a config file. So I load that config file from Python and create deployments. For one particular use case, I want to be able to configure the retries there also as I want some exaggerated retry policy for only one particular deployment. From what I read above, it's not possible the way I described in my first msg but it's possible with
with_options
which will be ideal for me.
m
thought for 74.6 seconds
That approach will work well. Since you’re creating deployments from Python, you can apply
with_options
just before
.deploy()
for the one deployment that needs the “exaggerated” policy. Example patterns you can drop into your config-driven deploy script: - If your flow function is importable locally
Copy code
from prefect import flow

@flow
def my_flow(param: int): 
    ...

def deploy_from_config(cfg: dict):
    for d in cfg["deployments"]:
        f = my_flow
        # Only apply retries if provided for this deployment
        if "retries" in d or "retry_delay_seconds" in d:
            f = f.with_options(
                retries=d.get("retries"),
                retry_delay_seconds=d.get("retry_delay_seconds"),
            )
        f.deploy(
            name=d["name"],
            work_pool_name=d["work_pool"],
            parameters=d.get("parameters"),
            cron=d.get("cron"),
            # any other fields you use (image, tags, description, etc.)
        )
- If you load the flow from a source repo/entrypoint
Copy code
from prefect.flows import Flow

def deploy_from_config(cfg: dict):
    for d in cfg["deployments"]:
        flow_obj = Flow.from_source(
            source=d["source"],                  # e.g. "<https://github.com/org/repo>"
            entrypoint=d["entrypoint"],          # e.g. "flows/etl.py:my_flow"
        )
        if "retries" in d or "retry_delay_seconds" in d:
            flow_obj = flow_obj.with_options(
                retries=d.get("retries"),
                retry_delay_seconds=d.get("retry_delay_seconds"),
            )
        flow_obj.deploy(
            name=d["name"],
            work_pool_name=d["work_pool"],
            parameters=d.get("parameters"),
            cron=d.get("cron"),
        )
Notes: - Flow-level
with_options
supports
retries
and
retry_delay_seconds
(per-run delay). This sets the retry policy on runs of that deployment. - If you also need task-specific retry behavior, you can apply
with_options
to tasks when you call them (e.g., branch based on deployment name via
prefect.runtime.deployment.name
), and tasks support richer delay patterns (lists/callables). - This keeps your base flow definition clean while letting you dial up retries for a single deployment. Refs: - Retries in Prefect 3 - Source for deploy/from_source: prefect/flows.py If you share a sample of your config shape, I can sketch a small parser/helper to map `retries`/`retry_delay_seconds` cleanly and validate them.
Short answer: no. In Prefect 3.x, the arguments you pass to the
@flow
decorator (like
retries
and
retry_delay_seconds
) are fixed at decoration time. They can’t be pulled from the flow’s runtime parameters because the decorator runs when the module is imported, before the flow’s parameters exist. What you can do instead: - Set retries statically at definition time:
Copy code
@flow(retries=5, retry_delay_seconds=30)
def my_flow(...):
    ...
- Set retries per invocation using with_options (compute the value however you like, then call the flow):
Copy code
@flow
def my_flow(x: int): 
    ...

def run_my_flow(retries: int = 5, delay: float = 30, **params):
    return my_flow.with_options(
        retries=retries, 
        retry_delay_seconds=delay
    )(**params)

# Example
run_my_flow(retries=2, delay=10, x=42)
- For subflows, you can also choose retries per call:
Copy code
@flow
def child(): 
    ...

@flow
def parent(mode: str = "normal"):
    n = 1 if mode == "fast" else 5
    child.with_options(retries=n)()
Notes: - There isn’t a supported pattern to dynamically change the current run’s flow-level retries from inside the flow body. - There’s no
Retry
signal you can raise to “add” retries on the fly. - If you’re deploying, you can bake the desired retries into the flow object before you deploy (e.g.,
my_flow.with_options(retries=...)
), or create multiple deployments with different retry policies. Docs: - Retries in Prefect 3 If you can share how you’re triggering runs (deployment vs. direct Python call), I can suggest the cleanest way to parameterize retries for your setup.
i
@Marvin the with_options isn't working, retries not showing up in the run UI & also not taking effect. I have a flow with retries by default set to 0. But in the per deployment config, I load the config & do flow.with_options(...).deploy(...)
m
thought for 148.9 seconds
Thanks for the details — I think I see what’s happening. Short version: calling
with_options(...)
only at deploy time won’t change the flow’s runtime behavior unless the deployment’s entrypoint points to the overridden Flow object. If the entrypoint points to your original
@flow(retries=0)
symbol, the worker will import that and you’ll get 0 retries (and the UI won’t show retries). Why:
with_options
returns a new Flow object with different settings. If you call it inline and immediately call
.deploy(...)
, Prefect will often resolve the entrypoint to the original, module-level flow symbol (the one with
retries=0
), so your overrides aren’t applied at runtime. How to fix (two reliable patterns) - Pattern A: assign the overridden flow to a module-level name and deploy that name. - flows.py
Copy code
from prefect import flow

    @flow(retries=0)
    def my_flow():
        ...

    # Create a distinct, named flow object with overrides
    prod_flow = my_flow.with_options(retries=3, retry_delay_seconds=10)
- deploy.py
Copy code
from flows import prod_flow

    if __name__ == "__main__":
        prod_flow.deploy(
            name="prod",
            work_pool_name="your-pool",
        )
Ensure the deployment entrypoint shows something like
flows.py:prod_flow
(not
flows.py:my_flow
). Then runs from this deployment will have 3 retries and you’ll see them in the run UI. - Pattern B: if you deploy from a remote source, point the entrypoint at the overridden name
Copy code
from prefect.flows import Flow

  Flow.from_source(
      source="<https://github.com/your-org/your-repo>",
      entrypoint="flows.py:prod_flow",  # points to the overridden object
  ).deploy(
      name="prod",
      work_pool_name="your-pool",
  )
Quick checks to confirm - In the UI, open the Deployment > Details. The Entrypoint should be
...:prod_flow
(or whatever name you assigned after
with_options
), not the original decorated function. - Trigger a run and check the run details; “Retries”/attempts should reflect your override, and failed runs should show retry scheduling. - Make sure failures actually cause the flow to fail (flow-level retries only occur when the flow run fails; task retries are separate). If you’d like, paste a small snippet of how you’re calling
with_options(...).deploy(...)
and the deployment’s entrypoint shown in the UI, and I’ll pinpoint the exact adjustment. Docs for reference: - Flow/task retries in Prefect 3 - Deployments and prefect.yaml (for broader deployment patterns)
i
@Marvin I see, I guess this is pointless for my use case then (set retry logic per deployment), am I right? like I have a central flow that would be deployed a lot of times with different parameters & schedules..
m
thought for 58.2 seconds
You’re right about one key thing: in Prefect 3.x there isn’t a first-class “set flow-level retries per deployment” knob. Flow-level retries are defined on the Flow object itself. The
deploy()
API and
prefect.yaml
do not have a
retries
field to override the flow at deployment time. That said, it’s not pointless—you can still achieve per-deployment retry policies with one of these patterns: - Use multiple Flow variants from your central flow (recommended) - Define importable variants at module scope with
with_options(...)
and deploy each one. - Example:
Copy code
# flows.py
    from prefect import flow

    @flow(retries=0)
    def central_flow(param: str = "default"):
        ...

    fast_retry = central_flow.with_options(
        name="central_flow-fast-retry", retries=3, retry_delay_seconds=5
    )

    slow_retry = central_flow.with_options(
        name="central_flow-slow-retry", retries=5, retry_delay_seconds=60
    )
Deploy each with its own parameters/schedule:
Copy code
from flows import fast_retry, slow_retry

    fast_retry.deploy(
        name="central-fast",
        work_pool_name="your-pool",
        cron="*/5 * * * *",
        parameters={"param": "staging"},
    )

    slow_retry.deploy(
        name="central-slow",
        work_pool_name="your-pool",
        cron="0 * * * *",
        parameters={"param": "prod"},
    )
Important: the deployment entrypoint must reference the overridden object (e.g.,
flows.py:fast_retry
). Then retries will show up and take effect for runs from that deployment. - If you need many variants, generate them as importable names - You can programmatically create variants at module import time so each has a stable entrypoint:
Copy code
# flows.py
    from prefect import flow

    @flow(retries=0)
    def central_flow(...):
        ...

    def register_variant(name, retries, delay):
        variant = central_flow.with_options(name=name, retries=retries, retry_delay_seconds=delay)
        globals()[name] = variant  # make it importable as flows.<name>
        return variant

    register_variant("central_us_west", retries=3, delay=10)
    register_variant("central_eu_prod", retries=5, delay=60)
Then deploy using those entrypoints (e.g.,
flows.py:central_us_west
,
flows.py:central_eu_prod
). - Alternative if “new runs” are acceptable instead of “retry attempts” - Use deployment triggers/automations to start a new run when a flow run fails. This approximates retries per deployment but won’t show as “attempt 2” of the same run; it creates a new run. A couple of gotchas to avoid: - Calling
flow.with_options(...).deploy(...)
inline without binding the result to an importable name often means the deployment will still point to the original flow symbol, so your overrides won’t apply. - Task-level retries are separate; if your goal is resilience inside the flow, consider task retries as well, but those also need to be set on the task objects. If you share: - how many deployments you expect, - whether you’re deploying from local code or
Flow.from_source(...)
, - and whether “new run on failure” is acceptable, I can suggest the cleanest structure for your setup. Refs: - Flow/task retries in Prefect 3 - Deployments with prefect.yaml
i
@Marvin Keeping up with this
my_flow.with_options()
it wouldn't help if I don't define any retries in the original flow right?
m
thought for 20.5 seconds
Short answer: you do NOT need to define retries on the original flow.
with_options(retries=...)
will add/override retries even if the base flow has none (i.e.,
retries=0
). Key points: -
with_options
returns a new Flow object with the provided overrides. It doesn’t matter if the original flow had
retries=0
or didn’t specify retries at all. - The override only takes effect if the deployment’s entrypoint points to the overridden Flow object (the result of
with_options
), not the original. Example:
Copy code
from prefect import flow

@flow  # no retries set here (defaults to 0)
def my_flow():
    ...

# Create a per-deployment variant with retries
prod_flow = my_flow.with_options(retries=3, retry_delay_seconds=10)

# Deploy the variant and ensure entrypoint is flows.py:prod_flow
prod_flow.deploy(
    name="my-flow-prod",
    work_pool_name="your-pool",
)
If the deployment entrypoint is
flows.py:my_flow
, you’ll still get 0 retries. Make sure it’s
flows.py:prod_flow
. Docs: - Retries in Prefect 3
i
@Marvin I see, so the retries are a property of the flow and not the deployment that's why it's fundamentally impossible to change on the fly per deployment. Am I right?
m
thought for 14.7 seconds
Yes — in Prefect 3.x, retries are a property of the Flow object, not the deployment. There isn’t a deployment-level field to override flow-level retries “on the fly.” You can still get per-deployment behavior by: - Pointing each deployment’s entrypoint at a different Flow object created via
my_flow.with_options(retries=..., retry_delay_seconds=...)
- Or using deployment triggers/automations to start a new run on failure (approximate retries per deployment, but as new runs rather than retry attempts) Refs: - Retries in Prefect 3 - Deployments overview