Kiran
09/04/2025, 7:26 AMMarvin
09/04/2025, 7:26 AMMarvin
09/04/2025, 7:27 AMretries
works even if you don’t set retry_delay_seconds
. If you omit retry_delay_seconds
, the delay defaults to 0 seconds, so retries happen immediately.
- Flow decorator signature shows retry_delay_seconds
is optional
- Default delay is effectively 0 (immediate retry)
Example:
from prefect import flow
import random
@flow(retries=3) # no retry_delay_seconds -> immediate retries
def my_flow():
if random.random() < 0.8:
raise RuntimeError("Transient error")
return "Success!"
if __name__ == "__main__":
my_flow()
If you want a pause between attempts, set `retry_delay_seconds`:
@flow(retries=3, retry_delay_seconds=10)
def my_flow():
...
Docs:
- Retries how-to
- Settings referenceKiran
09/05/2025, 6:53 AMMarvin
09/05/2025, 6:53 AMMarvin
09/05/2025, 6:55 AMflow_run_id
is used and the attempt count increments.
- Task retries behave the same way: same task run ID, new attempt recorded.
- Depending on your work pool/infrastructure, each retry may execute in a fresh process/container/pod, but it’s still the same Prefect run in the UI/API.
Docs: Retries guideKiran
09/05/2025, 7:22 AMMarvin
09/05/2025, 7:22 AMMarvin
09/05/2025, 7:29 AMMarvin
09/05/2025, 7:30 AMMarvin
09/05/2025, 7:31 AMreturn Failed(message="my-custom-message")
) and you’ve configured retries on the task or flow, Prefect will still retry.
- The engine treats that as a failure. If attempts remain, it moves the run to AwaitingRetry and schedules another attempt with the same run ID.
- This applies to both tasks and flows in Prefect 3.x.
Example:
from prefect import task, flow
from prefect.states import Failed
@task(retries=3, retry_delay_seconds=10)
def do_thing():
return Failed(message="my-custom-message")
@flow(retries=2, retry_delay_seconds=30)
def my_flow():
do_thing()
If you want to fail without retrying:
- Set retries=0
, or
- Use retry_condition_fn
to control when a retry should occur, or
- Raise/return a non-retriable terminal state only when you truly want to stop.
Refs:
- Retries guide: Retries guide
- States (source): states.py
- Task engine logic (source): task_engine.pyMarvin
09/05/2025, 7:35 AMFailed(message="my-custom-message")
from a task or flow, Prefect will mark that attempt as failed and, if retries are configured (and not vetoed by a retry_condition_fn
), it will transition the run to AwaitingRetry and schedule the next attempt. It’s still the same run ID; only the attempt count increments.
That said, the recommended pattern is to raise an exception instead of returning a Failed
state. Prefect will convert the exception into a Failed state for you and apply the retry policy.
Example:
from prefect import task, flow
@task(retries=2, retry_delay_seconds=10)
def do_work():
# Preferred: raise to fail and trigger retries
raise RuntimeError("something went wrong")
# Also works, but less common:
# from prefect.states import Failed
# return Failed(message="my-custom-message")
@flow(retries=1, retry_delay_seconds=30)
def my_flow():
do_work()
if __name__ == "__main__":
my_flow()
If you need to conditionally allow/skip retries based on the failure details, use retry_condition_fn
on the task/flow.
Refs:
- Retries guide: Retries in Prefect
- Tasks API (retry params): Tasks API reference
- States (source): prefect.states sourceMarvin
09/05/2025, 7:46 AMreturn Failed(message="my-custom-message")
) and you’ve configured retries on that task or flow, Prefect will still retry. The run transitions to AwaitingRetry and the same run ID is retried until attempts are exhausted.
Notes:
- Raising an exception or returning Failed(...)
are both treated as failures for retry purposes.
- If you use a retry_condition_fn
, make sure it handles the case where there is no exception (since a manually returned Failed state may not have an associated exception). Otherwise your condition might unintentionally block retries.
Example:
from prefect import task
from prefect.states import Failed
@task(retries=3, retry_delay_seconds=10)
def do_work():
return Failed(message="my-custom-message")
More details: Retries guideKiran
09/05/2025, 10:23 AMMarvin
09/05/2025, 10:23 AMMarvin
09/05/2025, 10:24 AMFailed(message=...)
-> same outcome: it’s a failure state, so it’s eligible for retry under the same policy.
- Returning a normal value (Completed) -> no retry.
Notes:
- It’s more common to raise exceptions; Prefect will handle the state and retries for you.
- Conditional retries: tasks support retry_condition_fn
to allow/skip retries based on the failure details (works for both exceptions and manually returned Failed states). Flows don’t expose retry_condition_fn
.
Docs: Retries guideKiran
09/05/2025, 10:35 AMMarvin
09/05/2025, 10:35 AMMarvin
09/05/2025, 10:38 AMflow_run_id
is created.
- On ECS Fargate Spot, each retry will typically start a fresh ECS task/container, but it is still the same Prefect flow run in the UI/API.
- If the container is preempted and the run is marked Crashed, Prefect will apply the same retry policy (if retries remain) and reschedule the same flow run.
Refs:
- Retries: Retries guide
- States: State types and transitions