Idriss Bellil
08/29/2025, 8:00 AMflow
decorator consume arguments from the parameters, could this be used to specify flow retry logic as part of an argument I am talking about something like:
@flow(retries=<taken from below>)
def my_flow(retries=5):... # retries is passed above
Marvin
08/29/2025, 8:00 AMMarvin
08/29/2025, 8:01 AMMarvin
08/29/2025, 8:05 AM@flow(retries=...)
are evaluated at import time, so they can’t depend on the flow’s runtime parameters for that same run.
Good news: you can still set flow retries dynamically per run using one of these patterns:
- Per-call overrides with with_options
from prefect import flow
@flow
def my_flow(x: int):
...
# Choose retries at call time (e.g., based on some input/param)
retries = 3
delay = 10
my_flow.with_options(retries=retries, retry_delay_seconds=delay)(x=1)
- Wrapper flow that takes parameters and applies them to the inner flow
from prefect import flow
@flow
def inner_flow(data: str):
...
@flow
def driver_flow(retries: int = 0, delay: int = 0, data: str = "hi"):
# Set the inner flow’s retry policy based on the wrapper’s params
return inner_flow.with_options(
retries=retries, retry_delay_seconds=delay
)(data=data)
Notes and gotchas:
- Changing retries via runtime parameters cannot affect the current flow’s already-scheduled retry policy. Use with_options
on the flow you call.
- The same with_options
pattern works for tasks as well, and tasks have additional knobs like retry_delay_seconds
accepting a list or callable.
- If you’re triggering via a Deployment/UI/API, there isn’t a built-in parameter override for retries. Options:
- Bake different retry policies into different deployments of the same flow, or
- Use the wrapper-flow pattern above so you can pass retries as parameters to the wrapper.
References:
- Prefect retries guide: Retries in Prefect 3
- Flow decorator and options (source): prefect/flows.py
If you can share how you’re triggering runs (Python, Deployment, UI, automations), I can tailor the example to your setup.Marvin
08/29/2025, 8:08 AMMarvin
08/29/2025, 8:08 AMwith_options
or set up multiple deployments with different retry policies for your case.Idriss Bellil
08/29/2025, 8:13 AMwith_options
which will be ideal for me.Marvin
08/29/2025, 8:13 AMMarvin
08/29/2025, 8:14 AMwith_options
just before .deploy()
for the one deployment that needs the “exaggerated” policy.
Example patterns you can drop into your config-driven deploy script:
- If your flow function is importable locally
from prefect import flow
@flow
def my_flow(param: int):
...
def deploy_from_config(cfg: dict):
for d in cfg["deployments"]:
f = my_flow
# Only apply retries if provided for this deployment
if "retries" in d or "retry_delay_seconds" in d:
f = f.with_options(
retries=d.get("retries"),
retry_delay_seconds=d.get("retry_delay_seconds"),
)
f.deploy(
name=d["name"],
work_pool_name=d["work_pool"],
parameters=d.get("parameters"),
cron=d.get("cron"),
# any other fields you use (image, tags, description, etc.)
)
- If you load the flow from a source repo/entrypoint
from prefect.flows import Flow
def deploy_from_config(cfg: dict):
for d in cfg["deployments"]:
flow_obj = Flow.from_source(
source=d["source"], # e.g. "<https://github.com/org/repo>"
entrypoint=d["entrypoint"], # e.g. "flows/etl.py:my_flow"
)
if "retries" in d or "retry_delay_seconds" in d:
flow_obj = flow_obj.with_options(
retries=d.get("retries"),
retry_delay_seconds=d.get("retry_delay_seconds"),
)
flow_obj.deploy(
name=d["name"],
work_pool_name=d["work_pool"],
parameters=d.get("parameters"),
cron=d.get("cron"),
)
Notes:
- Flow-level with_options
supports retries
and retry_delay_seconds
(per-run delay). This sets the retry policy on runs of that deployment.
- If you also need task-specific retry behavior, you can apply with_options
to tasks when you call them (e.g., branch based on deployment name via prefect.runtime.deployment.name
), and tasks support richer delay patterns (lists/callables).
- This keeps your base flow definition clean while letting you dial up retries for a single deployment.
Refs:
- Retries in Prefect 3
- Source for deploy/from_source: prefect/flows.py
If you share a sample of your config shape, I can sketch a small parser/helper to map `retries`/`retry_delay_seconds` cleanly and validate them.Marvin
08/29/2025, 8:15 AM@flow
decorator (like retries
and retry_delay_seconds
) are fixed at decoration time. They can’t be pulled from the flow’s runtime parameters because the decorator runs when the module is imported, before the flow’s parameters exist.
What you can do instead:
- Set retries statically at definition time:
@flow(retries=5, retry_delay_seconds=30)
def my_flow(...):
...
- Set retries per invocation using with_options (compute the value however you like, then call the flow):
@flow
def my_flow(x: int):
...
def run_my_flow(retries: int = 5, delay: float = 30, **params):
return my_flow.with_options(
retries=retries,
retry_delay_seconds=delay
)(**params)
# Example
run_my_flow(retries=2, delay=10, x=42)
- For subflows, you can also choose retries per call:
@flow
def child():
...
@flow
def parent(mode: str = "normal"):
n = 1 if mode == "fast" else 5
child.with_options(retries=n)()
Notes:
- There isn’t a supported pattern to dynamically change the current run’s flow-level retries from inside the flow body.
- There’s no Retry
signal you can raise to “add” retries on the fly.
- If you’re deploying, you can bake the desired retries into the flow object before you deploy (e.g., my_flow.with_options(retries=...)
), or create multiple deployments with different retry policies.
Docs:
- Retries in Prefect 3
If you can share how you’re triggering runs (deployment vs. direct Python call), I can suggest the cleanest way to parameterize retries for your setup.Idriss Bellil
08/29/2025, 10:35 AMMarvin
08/29/2025, 10:35 AMMarvin
08/29/2025, 10:38 AMwith_options(...)
only at deploy time won’t change the flow’s runtime behavior unless the deployment’s entrypoint points to the overridden Flow object. If the entrypoint points to your original @flow(retries=0)
symbol, the worker will import that and you’ll get 0 retries (and the UI won’t show retries).
Why: with_options
returns a new Flow object with different settings. If you call it inline and immediately call .deploy(...)
, Prefect will often resolve the entrypoint to the original, module-level flow symbol (the one with retries=0
), so your overrides aren’t applied at runtime.
How to fix (two reliable patterns)
- Pattern A: assign the overridden flow to a module-level name and deploy that name.
- flows.py
from prefect import flow
@flow(retries=0)
def my_flow():
...
# Create a distinct, named flow object with overrides
prod_flow = my_flow.with_options(retries=3, retry_delay_seconds=10)
- deploy.py
from flows import prod_flow
if __name__ == "__main__":
prod_flow.deploy(
name="prod",
work_pool_name="your-pool",
)
Ensure the deployment entrypoint shows something like flows.py:prod_flow
(not flows.py:my_flow
). Then runs from this deployment will have 3 retries and you’ll see them in the run UI.
- Pattern B: if you deploy from a remote source, point the entrypoint at the overridden name
from prefect.flows import Flow
Flow.from_source(
source="<https://github.com/your-org/your-repo>",
entrypoint="flows.py:prod_flow", # points to the overridden object
).deploy(
name="prod",
work_pool_name="your-pool",
)
Quick checks to confirm
- In the UI, open the Deployment > Details. The Entrypoint should be ...:prod_flow
(or whatever name you assigned after with_options
), not the original decorated function.
- Trigger a run and check the run details; “Retries”/attempts should reflect your override, and failed runs should show retry scheduling.
- Make sure failures actually cause the flow to fail (flow-level retries only occur when the flow run fails; task retries are separate).
If you’d like, paste a small snippet of how you’re calling with_options(...).deploy(...)
and the deployment’s entrypoint shown in the UI, and I’ll pinpoint the exact adjustment.
Docs for reference:
- Flow/task retries in Prefect 3
- Deployments and prefect.yaml (for broader deployment patterns)Idriss Bellil
08/29/2025, 10:42 AMMarvin
08/29/2025, 10:42 AMMarvin
08/29/2025, 10:43 AMdeploy()
API and prefect.yaml
do not have a retries
field to override the flow at deployment time.
That said, it’s not pointless—you can still achieve per-deployment retry policies with one of these patterns:
- Use multiple Flow variants from your central flow (recommended)
- Define importable variants at module scope with with_options(...)
and deploy each one.
- Example:
# flows.py
from prefect import flow
@flow(retries=0)
def central_flow(param: str = "default"):
...
fast_retry = central_flow.with_options(
name="central_flow-fast-retry", retries=3, retry_delay_seconds=5
)
slow_retry = central_flow.with_options(
name="central_flow-slow-retry", retries=5, retry_delay_seconds=60
)
Deploy each with its own parameters/schedule:
from flows import fast_retry, slow_retry
fast_retry.deploy(
name="central-fast",
work_pool_name="your-pool",
cron="*/5 * * * *",
parameters={"param": "staging"},
)
slow_retry.deploy(
name="central-slow",
work_pool_name="your-pool",
cron="0 * * * *",
parameters={"param": "prod"},
)
Important: the deployment entrypoint must reference the overridden object (e.g., flows.py:fast_retry
). Then retries will show up and take effect for runs from that deployment.
- If you need many variants, generate them as importable names
- You can programmatically create variants at module import time so each has a stable entrypoint:
# flows.py
from prefect import flow
@flow(retries=0)
def central_flow(...):
...
def register_variant(name, retries, delay):
variant = central_flow.with_options(name=name, retries=retries, retry_delay_seconds=delay)
globals()[name] = variant # make it importable as flows.<name>
return variant
register_variant("central_us_west", retries=3, delay=10)
register_variant("central_eu_prod", retries=5, delay=60)
Then deploy using those entrypoints (e.g., flows.py:central_us_west
, flows.py:central_eu_prod
).
- Alternative if “new runs” are acceptable instead of “retry attempts”
- Use deployment triggers/automations to start a new run when a flow run fails. This approximates retries per deployment but won’t show as “attempt 2” of the same run; it creates a new run.
A couple of gotchas to avoid:
- Calling flow.with_options(...).deploy(...)
inline without binding the result to an importable name often means the deployment will still point to the original flow symbol, so your overrides won’t apply.
- Task-level retries are separate; if your goal is resilience inside the flow, consider task retries as well, but those also need to be set on the task objects.
If you share:
- how many deployments you expect,
- whether you’re deploying from local code or Flow.from_source(...)
,
- and whether “new run on failure” is acceptable,
I can suggest the cleanest structure for your setup.
Refs:
- Flow/task retries in Prefect 3
- Deployments with prefect.yamlIdriss Bellil
08/29/2025, 12:21 PMmy_flow.with_options()
it wouldn't help if I don't define any retries in the original flow right?Marvin
08/29/2025, 12:21 PMMarvin
08/29/2025, 12:21 PMwith_options(retries=...)
will add/override retries even if the base flow has none (i.e., retries=0
).
Key points:
- with_options
returns a new Flow object with the provided overrides. It doesn’t matter if the original flow had retries=0
or didn’t specify retries at all.
- The override only takes effect if the deployment’s entrypoint points to the overridden Flow object (the result of with_options
), not the original.
Example:
from prefect import flow
@flow # no retries set here (defaults to 0)
def my_flow():
...
# Create a per-deployment variant with retries
prod_flow = my_flow.with_options(retries=3, retry_delay_seconds=10)
# Deploy the variant and ensure entrypoint is flows.py:prod_flow
prod_flow.deploy(
name="my-flow-prod",
work_pool_name="your-pool",
)
If the deployment entrypoint is flows.py:my_flow
, you’ll still get 0 retries. Make sure it’s flows.py:prod_flow
.
Docs:
- Retries in Prefect 3Idriss Bellil
08/29/2025, 12:32 PMMarvin
08/29/2025, 12:32 PMMarvin
08/29/2025, 12:32 PMmy_flow.with_options(retries=..., retry_delay_seconds=...)
- Or using deployment triggers/automations to start a new run on failure (approximate retries per deployment, but as new runs rather than retry attempts)
Refs:
- Retries in Prefect 3
- Deployments overview