<@ULVA73B9P> how can I make a Prefect task retry o...
# ask-marvin
i
@Marvin how can I make a Prefect task retry only for certain exceptions and not all of them? or alternatively (even better) decide from the task itself when to keep the retry logic (easy) or fail early without going through the task configured retry logic (not clear to me how to do that)?
m
thought for 463.1 seconds
Great question — you can do both in Prefect 3.x: 1) Retry only for certain exceptions - Use the
retry_condition_fn
parameter on the
@task
decorator. - Prefect will call this function with the exception that caused the failure; return True to retry, False to fail immediately (no retry). Example: only retry on transient HTTP/connection issues
Copy code
from prefect import task
import requests

def only_transient_http(exc: BaseException) -> bool:
    # Retry 502/503/504 or network/timeout errors
    if isinstance(exc, requests.exceptions.HTTPError):
        status = getattr(exc.response, "status_code", None)
        return status in {502, 503, 504}
    return isinstance(exc, (requests.exceptions.ConnectionError, requests.exceptions.Timeout))

@task(
    retries=5,
    retry_delay_seconds=[5, 10, 20, 40, 60],
    retry_condition_fn=only_transient_http,
)
def fetch(url: str) -> str:
    r = requests.get(url, timeout=10)
    r.raise_for_status()
    return r.text
2) Decide from inside the task to skip retries (fail fast) - Raise
prefect.exceptions.Abort
to fail immediately without consuming retries. This is a Prefect signal that bypasses retry logic. Example: fail fast for non-retryable, business-rule errors
Copy code
from prefect import task
from prefect.exceptions import Abort

@task(retries=3, retry_delay_seconds=10)
def process(record: dict):
    if not record.get("id"):
        # Non-retryable validation failure — stop immediately
        raise Abort("Invalid record: missing id")

    # If something else goes wrong and is transient, just raise the error
    # and let retry_condition_fn (if set) decide, or let Prefect retry by default
    ...
3) Combine both patterns - Let your task raise exceptions normally for retryable cases and raise
Abort
for non-retryable cases. Your
retry_condition_fn
can then focus on filtering which “normal” exceptions should be retried. Example:
Copy code
from prefect import task
from prefect.exceptions import Abort

def retry_on_value_errors(exc: BaseException) -> bool:
    return isinstance(exc, ValueError)

@task(retries=4, retry_delay_seconds=5, retry_condition_fn=retry_on_value_errors)
def do_work(x: int):
    if x < 0:
        # Non-retryable classification decided at runtime by the task
        raise Abort("x cannot be negative; failing without retries")

    # If a ValueError occurs below, Prefect will retry
    return int("maybe_flaky_parse")
Notes and references - In Prefect 3.x,
retry_condition_fn
is called with the raised exception; return True to retry and False to fail without retry. - To bypass retries from inside a task, raise
Abort
from
prefect.exceptions
. Links: - How-to: retries https://docs-3.prefect.io/v3/how-to-guides/workflows/retries - Task API (includes
retry_condition_fn
) https://docs-3.prefect.io/v3/api-ref/python/prefect-tasks - Exceptions (Abort/PrefectSignal) source https://github.com/PrefectHQ/prefect/blob/main/src/prefect/exceptions.py If you’re on Prefect 2.x, the same patterns exist, but let me know your version and I can tailor examples precisely.
i
@Marvin the signature is different than what you showed, I see
def retry_handler(task, task_run, state) -> bool:
in the docs, give that, should I rely on the following:
Copy code
def _retry_handler(task: Task, task_run: TaskRun, state: State) -> bool:
    try:
        state.result()
    except SpecialException:
        # The task can't continue retrying, this is wrong
        return False
    except Exception:
        # Proceed with the rest of the retry logic
        return True
    else:
        # Proceed with the rest of the retry logic
        return True
m
thought for 52.5 seconds
You’re right — in Prefect 3.x the retry hook signature is:
Copy code
def retry_handler(task, task_run, state) -> bool:
    ...
Thanks for the correction. About your snippet: it’s close, but I’d recommend using
prefect.states.get_state_exception(state)
instead of calling
state.result()
. Calling
state.result()
can try to resolve/load results (and will raise on failure), but the intent here is just to inspect the exception that caused the failure, which
get_state_exception
does directly and clearly. A safer/clearer version of your handler:
Copy code
from prefect.states import get_state_exception

def _retry_handler(task, task_run, state) -> bool:
    exc = get_state_exception(state)

    # Nothing to retry if there's no exception on the state
    if exc is None:
        return False

    # Do not retry for your non-retryable error
    if isinstance(exc, SpecialException):
        return False

    # Otherwise, retry (or add more filters here)
    return True
If you only want to retry certain exceptions (e.g., transient HTTP errors), tighten the logic:
Copy code
from prefect.states import get_state_exception
import requests

TRANSIENT_STATUSES = {502, 503, 504}

def retry_only_transient(task, task_run, state) -> bool:
    exc = get_state_exception(state)
    if exc is None:
        return False

    # Never retry this custom case
    if isinstance(exc, SpecialException):
        return False

    # Retry for transient HTTP conditions
    if isinstance(exc, requests.exceptions.HTTPError):
        code = getattr(getattr(exc, "response", None), "status_code", None)
        return code in TRANSIENT_STATUSES

    return isinstance(exc, (requests.exceptions.ConnectionError, requests.exceptions.Timeout))
And wire it up:
Copy code
from prefect import task

@task(
    retries=5,
    retry_delay_seconds=[5, 10, 20, 40, 60],
    retry_condition_fn=retry_only_transient,
)
def my_task(...):
    ...
Fail fast from inside the task (skip retries entirely) - Raise
prefect.exceptions.Abort
from within the task when you decide it should not be retried:
Copy code
from prefect import task
from prefect.exceptions import Abort

@task(retries=3, retry_delay_seconds=10)
def process(record):
    if not record.get("id"):
        # Non-retryable condition decided at runtime
        raise Abort("Invalid record: missing id")
    ...
Raising
Abort
will immediately end the task without consuming retries, regardless of the retry handler. References: - How-to: Retries https://docs-3.prefect.io/v3/how-to-guides/workflows/retries - Task API (includes
retry_condition_fn
) https://docs-3.prefect.io/v3/api-ref/python/prefect-tasks - States utilities (e.g.,
get_state_exception
) https://docs-3.prefect.io/v3/api-ref/python/prefect-states - Exceptions (Abort/PrefectSignal) source https://github.com/PrefectHQ/prefect/blob/main/src/prefect/exceptions.py If you share the exact exceptions you want to treat as retryable/non-retryable, I can tailor the
retry_condition_fn
for your case.
i
@Marvin I have more retry logic (number, wait seconds, ..), so I think I should only return false when I should stop everything, other than that, I want to let the rest of the retry logic handle the rest. The example I copied above is from the docs.
m
thought for 40.1 seconds
You’ve got the right mental model. In Prefect 3.x, treat
retry_condition_fn
as a gate: - Return True → allow Prefect’s configured retry logic (retries, delay, jitter) to proceed - Return False → stop all retries now and mark the task failed Given that, I’d make one tweak to the docs’ example: avoid
state.result()
and inspect the exception directly.
state.result()
can try to load/deserialize results; you only need the exception. Use
prefect.states.get_state_exception(state)
. Example “only stop on SpecialException; otherwise let retries proceed”:
Copy code
from prefect.states import get_state_exception

def _retry_handler(task, task_run, state) -> bool:
    exc = get_state_exception(state)

    # No exception? Nothing to retry.
    if exc is None:
        return False

    # Stop all retries for non-retryable error(s)
    if isinstance(exc, SpecialException):
        return False

    # Otherwise, let the configured retry policy handle it
    return True
If you want to add attempt-aware logic, you can use `task_run.run_count`:
Copy code
from prefect.states import get_state_exception

def _retry_handler(task, task_run, state) -> bool:
    exc = get_state_exception(state)
    if exc is None:
        return False

    # Example: after 3 attempts, stop retrying for RateLimitError
    if isinstance(exc, RateLimitError) and task_run.run_count >= 3:
        return False

    return True
Wire it up on the task:
Copy code
from prefect import task

@task(
    retries=5,
    retry_delay_seconds=[5, 10, 20, 40, 60],
    retry_jitter_factor=0.2,
    retry_condition_fn=_retry_handler,
)
def my_task(...):
    ...
Fail fast from inside the task (skip retries entirely): - Raise
prefect.exceptions.Abort
to bypass retries regardless of the handler.
Copy code
from prefect import task
from prefect.exceptions import Abort

@task(retries=5, retry_delay_seconds=10, retry_condition_fn=_retry_handler)
def process(record):
    if not record.get("id"):
        raise Abort("Invalid record: missing id")  # fail immediately
    ...
Why not
state.result()
? - It can trigger result resolution/reads;
get_state_exception(state)
directly returns the recorded exception and is clearer for this use. Refs: - How-to: Retries docs - Task API (
retry_condition_fn
) api - States helpers (
get_state_exception
) states api - Exceptions (Abort/PrefectSignal) source If you share the specific exceptions you consider non-retryable vs. retryable, I can tailor the handler to your exact rules.