hey <@ULVA73B9P>, if iam using retires and not ret...
# ask-community
k
hey @Marvin, if iam using retires and not retry_delay_seconds parameter with flow decorator, does retires work ?
m
thought for 69.3 seconds
Yes — in Prefect 3.x,
retries
works even if you don’t set
retry_delay_seconds
. If you omit
retry_delay_seconds
, the delay defaults to 0 seconds, so retries happen immediately. - Flow decorator signature shows
retry_delay_seconds
is optional - Default delay is effectively 0 (immediate retry) Example:
Copy code
from prefect import flow
import random

@flow(retries=3)  # no retry_delay_seconds -> immediate retries
def my_flow():
    if random.random() < 0.8:
        raise RuntimeError("Transient error")
    return "Success!"

if __name__ == "__main__":
    my_flow()
If you want a pause between attempts, set `retry_delay_seconds`:
Copy code
@flow(retries=3, retry_delay_seconds=10)
def my_flow():
    ...
Docs: - Retries how-to - Settings reference
k
@Marvin does retries create a new run altogether or just rerun the same flow
m
thought for 128.9 seconds
Short answer: retries rerun the same flow run — they do not create a new flow run. - Prefect marks the run as AwaitingRetry, schedules it again, and re-executes it; the same
flow_run_id
is used and the attempt count increments. - Task retries behave the same way: same task run ID, new attempt recorded. - Depending on your work pool/infrastructure, each retry may execute in a fresh process/container/pod, but it’s still the same Prefect run in the UI/API. Docs: Retries guide
k
@Marvin so even if I manipulate a run like return Failed(mesasage="my-suctom-message"), even then rtries happpen?
m
thought for 1399.1 seconds
thought for 109.3 seconds
thought for 295.3 seconds
Yes — if you intentionally set the state to Failed (e.g.,
return Failed(message="my-custom-message")
) and you’ve configured retries on the task or flow, Prefect will still retry. - The engine treats that as a failure. If attempts remain, it moves the run to AwaitingRetry and schedules another attempt with the same run ID. - This applies to both tasks and flows in Prefect 3.x. Example:
Copy code
from prefect import task, flow
from prefect.states import Failed

@task(retries=3, retry_delay_seconds=10)
def do_thing():
    return Failed(message="my-custom-message")

@flow(retries=2, retry_delay_seconds=30)
def my_flow():
    do_thing()
If you want to fail without retrying: - Set
retries=0
, or - Use
retry_condition_fn
to control when a retry should occur, or - Raise/return a non-retriable terminal state only when you truly want to stop. Refs: - Retries guide: Retries guide - States (source): states.py - Task engine logic (source): task_engine.py
Yes — if you return
Failed(message="my-custom-message")
from a task or flow, Prefect will mark that attempt as failed and, if retries are configured (and not vetoed by a
retry_condition_fn
), it will transition the run to AwaitingRetry and schedule the next attempt. It’s still the same run ID; only the attempt count increments. That said, the recommended pattern is to raise an exception instead of returning a
Failed
state. Prefect will convert the exception into a Failed state for you and apply the retry policy. Example:
Copy code
from prefect import task, flow

@task(retries=2, retry_delay_seconds=10)
def do_work():
    # Preferred: raise to fail and trigger retries
    raise RuntimeError("something went wrong")

    # Also works, but less common:
    # from prefect.states import Failed
    # return Failed(message="my-custom-message")

@flow(retries=1, retry_delay_seconds=30)
def my_flow():
    do_work()

if __name__ == "__main__":
    my_flow()
If you need to conditionally allow/skip retries based on the failure details, use
retry_condition_fn
on the task/flow. Refs: - Retries guide: Retries in Prefect - Tasks API (retry params): Tasks API reference - States (source): prefect.states source
Yes — if you return a Prefect Failed state (e.g.,
return Failed(message="my-custom-message")
) and you’ve configured retries on that task or flow, Prefect will still retry. The run transitions to AwaitingRetry and the same run ID is retried until attempts are exhausted. Notes: - Raising an exception or returning
Failed(...)
are both treated as failures for retry purposes. - If you use a
retry_condition_fn
, make sure it handles the case where there is no exception (since a manually returned Failed state may not have an associated exception). Otherwise your condition might unintentionally block retries. Example:
Copy code
from prefect import task
from prefect.states import Failed

@task(retries=3, retry_delay_seconds=10)
def do_work():
    return Failed(message="my-custom-message")
More details: Retries guide
k
@Marvin so when an exception is raised only that is considered for retries? and return is not considered for retries?
m
thought for 30.4 seconds
Not just exceptions. Prefect will retry on any failure state when retries>0. - Raising an exception -> Prefect marks the run Failed and, if retries remain, moves it to AwaitingRetry and re-attempts. - Returning
Failed(message=...)
-> same outcome: it’s a failure state, so it’s eligible for retry under the same policy. - Returning a normal value (Completed) -> no retry. Notes: - It’s more common to raise exceptions; Prefect will handle the state and retries for you. - Conditional retries: tasks support
retry_condition_fn
to allow/skip retries based on the failure details (works for both exceptions and manually returned Failed states). Flows don’t expose
retry_condition_fn
. Docs: Retries guide
k
@Marvin my flows run on ecs faragte spot, so a flow retry will happen with the same flow run or a differnt flowrun wil be created? when the flow is failed
m
thought for 133.1 seconds
Short answer: same flow run ID. Retries do not create a new flow run; they reschedule the existing run and increment its attempt count. - When a flow fails (e.g., raises an exception or returns a Failed state) and you’ve configured flow retries, Prefect moves that same flow run to AwaitingRetry and then re-executes it. No new
flow_run_id
is created. - On ECS Fargate Spot, each retry will typically start a fresh ECS task/container, but it is still the same Prefect flow run in the UI/API. - If the container is preempted and the run is marked Crashed, Prefect will apply the same retry policy (if retries remain) and reschedule the same flow run. Refs: - Retries: Retries guide - States: State types and transitions