Shreyas Gowda
08/22/2025, 9:23 AMMarvin
08/22/2025, 9:23 AMMarvin
08/22/2025, 9:27 AMfrom prefect import task, flow
@task(retries=3, retry_delay_seconds=10)
def flaky_task():
# Raises -> retry; succeeds -> completes
...
@flow
def run():
return flaky_task()
Exponential backoff + jitter
from prefect import task, flow
from prefect.tasks import exponential_backoff
@task(
retries=5,
retry_delay_seconds=exponential_backoff(2), # 1, 2, 4, 8, 16 (seconds)
retry_jitter_factor=0.2 # ±20% jitter
)
def unstable():
...
@flow
def run():
unstable()
Retry only on certain errors (tasks)
from prefect import task, flow
def retry_on_5xx(exc):
# Example: only retry HTTP 5xx errors
from httpx import HTTPStatusError
return isinstance(exc, HTTPStatusError) and 500 <= exc.response.status_code < 600
@task(
retries=4,
retry_delay_seconds=[5, 10, 20, 40],
retry_condition_fn=retry_on_5xx
)
def get_data():
...
@flow
def run():
get_data()
Flow-level retries
from prefect import flow
@flow(retries=2, retry_delay_seconds=60, retry_jitter_factor=0.1)
def pipeline():
...
Note: Flow retries apply to the flow run itself. They do not set defaults for tasks; configure tasks separately if needed.
Override at runtime (per-call)
from prefect import task, flow
@task(retries=3, retry_delay_seconds=10)
def maybe_fails():
...
@flow
def run():
# Increase retries for this call only
more_retries = maybe_fails.with_options(retries=10, retry_delay_seconds=5)
more_retries()
# Or disable retries for this call
no_retry = maybe_fails.with_options(retries=0)
no_retry()
How retries interact with states and hooks
- On a retryable exception, the run moves to AwaitingRetry with a scheduled next start time.
- After all attempts are exhausted, it enters Failed.
- on_failure hooks fire when the run actually enters a Failed state (i.e., after the final attempt), not between retries.
Want help tailoring this? Are you retrying tasks or the whole flow, and do you need exponential backoff or retries only for certain exceptions?