Idriss Bellil
10/13/2025, 1:48 PMMarvin
10/13/2025, 1:48 PMMarvin
10/13/2025, 1:55 PMretry_condition_fn parameter on the @task decorator.
- Prefect will call this function with the exception that caused the failure; return True to retry, False to fail immediately (no retry).
Example: only retry on transient HTTP/connection issues
from prefect import task
import requests
def only_transient_http(exc: BaseException) -> bool:
# Retry 502/503/504 or network/timeout errors
if isinstance(exc, requests.exceptions.HTTPError):
status = getattr(exc.response, "status_code", None)
return status in {502, 503, 504}
return isinstance(exc, (requests.exceptions.ConnectionError, requests.exceptions.Timeout))
@task(
retries=5,
retry_delay_seconds=[5, 10, 20, 40, 60],
retry_condition_fn=only_transient_http,
)
def fetch(url: str) -> str:
r = requests.get(url, timeout=10)
r.raise_for_status()
return r.text
2) Decide from inside the task to skip retries (fail fast)
- Raise prefect.exceptions.Abort to fail immediately without consuming retries. This is a Prefect signal that bypasses retry logic.
Example: fail fast for non-retryable, business-rule errors
from prefect import task
from prefect.exceptions import Abort
@task(retries=3, retry_delay_seconds=10)
def process(record: dict):
if not record.get("id"):
# Non-retryable validation failure — stop immediately
raise Abort("Invalid record: missing id")
# If something else goes wrong and is transient, just raise the error
# and let retry_condition_fn (if set) decide, or let Prefect retry by default
...
3) Combine both patterns
- Let your task raise exceptions normally for retryable cases and raise Abort for non-retryable cases. Your retry_condition_fn can then focus on filtering which “normal” exceptions should be retried.
Example:
from prefect import task
from prefect.exceptions import Abort
def retry_on_value_errors(exc: BaseException) -> bool:
return isinstance(exc, ValueError)
@task(retries=4, retry_delay_seconds=5, retry_condition_fn=retry_on_value_errors)
def do_work(x: int):
if x < 0:
# Non-retryable classification decided at runtime by the task
raise Abort("x cannot be negative; failing without retries")
# If a ValueError occurs below, Prefect will retry
return int("maybe_flaky_parse")
Notes and references
- In Prefect 3.x, retry_condition_fn is called with the raised exception; return True to retry and False to fail without retry.
- To bypass retries from inside a task, raise Abort from prefect.exceptions.
Links:
- How-to: retries https://docs-3.prefect.io/v3/how-to-guides/workflows/retries
- Task API (includes retry_condition_fn) https://docs-3.prefect.io/v3/api-ref/python/prefect-tasks
- Exceptions (Abort/PrefectSignal) source https://github.com/PrefectHQ/prefect/blob/main/src/prefect/exceptions.py
If you’re on Prefect 2.x, the same patterns exist, but let me know your version and I can tailor examples precisely.Idriss Bellil
10/13/2025, 2:18 PMdef retry_handler(task, task_run, state) -> bool: in the docs, give that, should I rely on the following:
def _retry_handler(task: Task, task_run: TaskRun, state: State) -> bool:
try:
state.result()
except SpecialException:
# The task can't continue retrying, this is wrong
return False
except Exception:
# Proceed with the rest of the retry logic
return True
else:
# Proceed with the rest of the retry logic
return TrueMarvin
10/13/2025, 2:19 PMMarvin
10/13/2025, 2:19 PMdef retry_handler(task, task_run, state) -> bool:
...
Thanks for the correction.
About your snippet: it’s close, but I’d recommend using prefect.states.get_state_exception(state) instead of calling state.result(). Calling state.result() can try to resolve/load results (and will raise on failure), but the intent here is just to inspect the exception that caused the failure, which get_state_exception does directly and clearly.
A safer/clearer version of your handler:
from prefect.states import get_state_exception
def _retry_handler(task, task_run, state) -> bool:
exc = get_state_exception(state)
# Nothing to retry if there's no exception on the state
if exc is None:
return False
# Do not retry for your non-retryable error
if isinstance(exc, SpecialException):
return False
# Otherwise, retry (or add more filters here)
return True
If you only want to retry certain exceptions (e.g., transient HTTP errors), tighten the logic:
from prefect.states import get_state_exception
import requests
TRANSIENT_STATUSES = {502, 503, 504}
def retry_only_transient(task, task_run, state) -> bool:
exc = get_state_exception(state)
if exc is None:
return False
# Never retry this custom case
if isinstance(exc, SpecialException):
return False
# Retry for transient HTTP conditions
if isinstance(exc, requests.exceptions.HTTPError):
code = getattr(getattr(exc, "response", None), "status_code", None)
return code in TRANSIENT_STATUSES
return isinstance(exc, (requests.exceptions.ConnectionError, requests.exceptions.Timeout))
And wire it up:
from prefect import task
@task(
retries=5,
retry_delay_seconds=[5, 10, 20, 40, 60],
retry_condition_fn=retry_only_transient,
)
def my_task(...):
...
Fail fast from inside the task (skip retries entirely)
- Raise prefect.exceptions.Abort from within the task when you decide it should not be retried:
from prefect import task
from prefect.exceptions import Abort
@task(retries=3, retry_delay_seconds=10)
def process(record):
if not record.get("id"):
# Non-retryable condition decided at runtime
raise Abort("Invalid record: missing id")
...
Raising Abort will immediately end the task without consuming retries, regardless of the retry handler.
References:
- How-to: Retries https://docs-3.prefect.io/v3/how-to-guides/workflows/retries
- Task API (includes retry_condition_fn) https://docs-3.prefect.io/v3/api-ref/python/prefect-tasks
- States utilities (e.g., get_state_exception) https://docs-3.prefect.io/v3/api-ref/python/prefect-states
- Exceptions (Abort/PrefectSignal) source https://github.com/PrefectHQ/prefect/blob/main/src/prefect/exceptions.py
If you share the exact exceptions you want to treat as retryable/non-retryable, I can tailor the retry_condition_fn for your case.Idriss Bellil
10/13/2025, 2:23 PMMarvin
10/13/2025, 2:23 PMMarvin
10/13/2025, 2:24 PMretry_condition_fn as a gate:
- Return True → allow Prefect’s configured retry logic (retries, delay, jitter) to proceed
- Return False → stop all retries now and mark the task failed
Given that, I’d make one tweak to the docs’ example: avoid state.result() and inspect the exception directly. state.result() can try to load/deserialize results; you only need the exception. Use prefect.states.get_state_exception(state).
Example “only stop on SpecialException; otherwise let retries proceed”:
from prefect.states import get_state_exception
def _retry_handler(task, task_run, state) -> bool:
exc = get_state_exception(state)
# No exception? Nothing to retry.
if exc is None:
return False
# Stop all retries for non-retryable error(s)
if isinstance(exc, SpecialException):
return False
# Otherwise, let the configured retry policy handle it
return True
If you want to add attempt-aware logic, you can use `task_run.run_count`:
from prefect.states import get_state_exception
def _retry_handler(task, task_run, state) -> bool:
exc = get_state_exception(state)
if exc is None:
return False
# Example: after 3 attempts, stop retrying for RateLimitError
if isinstance(exc, RateLimitError) and task_run.run_count >= 3:
return False
return True
Wire it up on the task:
from prefect import task
@task(
retries=5,
retry_delay_seconds=[5, 10, 20, 40, 60],
retry_jitter_factor=0.2,
retry_condition_fn=_retry_handler,
)
def my_task(...):
...
Fail fast from inside the task (skip retries entirely):
- Raise prefect.exceptions.Abort to bypass retries regardless of the handler.
from prefect import task
from prefect.exceptions import Abort
@task(retries=5, retry_delay_seconds=10, retry_condition_fn=_retry_handler)
def process(record):
if not record.get("id"):
raise Abort("Invalid record: missing id") # fail immediately
...
Why not state.result()?
- It can trigger result resolution/reads; get_state_exception(state) directly returns the recorded exception and is clearer for this use.
Refs:
- How-to: Retries docs
- Task API (retry_condition_fn) api
- States helpers (get_state_exception) states api
- Exceptions (Abort/PrefectSignal) source
If you share the specific exceptions you consider non-retryable vs. retryable, I can tailor the handler to your exact rules.