<@ULVA73B9P> How to perform retries?
# ask-marvin
s
@Marvin How to perform retries?
m
thought for 205.7 seconds
Assuming Prefect 3.x — here’s how to set up retries for tasks and flows. Key options - retries: number of retry attempts (total attempts = 1 + retries) - retry_delay_seconds: a number, a list of numbers, or a callable producing a list of delays - retry_jitter_factor: adds random jitter to each delay (e.g. 0.1 = ±10%) - retry_condition_fn (tasks only): function that decides whether to retry based on the exception Docs: Retries in Prefect 3.x Task retries (basic)
Copy code
from prefect import task, flow

@task(retries=3, retry_delay_seconds=10)
def flaky_task():
    # Raises -> retry; succeeds -> completes
    ...

@flow
def run():
    return flaky_task()
Exponential backoff + jitter
Copy code
from prefect import task, flow
from prefect.tasks import exponential_backoff

@task(
    retries=5,
    retry_delay_seconds=exponential_backoff(2),  # 1, 2, 4, 8, 16 (seconds)
    retry_jitter_factor=0.2                      # ±20% jitter
)
def unstable():
    ...

@flow
def run():
    unstable()
Retry only on certain errors (tasks)
Copy code
from prefect import task, flow

def retry_on_5xx(exc):
    # Example: only retry HTTP 5xx errors
    from httpx import HTTPStatusError
    return isinstance(exc, HTTPStatusError) and 500 <= exc.response.status_code < 600

@task(
    retries=4,
    retry_delay_seconds=[5, 10, 20, 40],
    retry_condition_fn=retry_on_5xx
)
def get_data():
    ...

@flow
def run():
    get_data()
Flow-level retries
Copy code
from prefect import flow

@flow(retries=2, retry_delay_seconds=60, retry_jitter_factor=0.1)
def pipeline():
    ...
Note: Flow retries apply to the flow run itself. They do not set defaults for tasks; configure tasks separately if needed. Override at runtime (per-call)
Copy code
from prefect import task, flow

@task(retries=3, retry_delay_seconds=10)
def maybe_fails():
    ...

@flow
def run():
    # Increase retries for this call only
    more_retries = maybe_fails.with_options(retries=10, retry_delay_seconds=5)
    more_retries()

    # Or disable retries for this call
    no_retry = maybe_fails.with_options(retries=0)
    no_retry()
How retries interact with states and hooks - On a retryable exception, the run moves to AwaitingRetry with a scheduled next start time. - After all attempts are exhausted, it enters Failed. - on_failure hooks fire when the run actually enters a Failed state (i.e., after the final attempt), not between retries. Want help tailoring this? Are you retrying tasks or the whole flow, and do you need exponential backoff or retries only for certain exceptions?