<@ULVA73B9P> I'd like to catch exceptions after re...
# marvin-ai
s
@Marvin I'd like to catch exceptions after retries of a task and return default values, then continuing the flow when a task is failing, can I use the
on_failure
hook to return default values ?
m
thought for 389.8 seconds
Short answer: No — an
on_failure
hook cannot change a task’s return value. Hooks run after the task transitions to a failed state and are for side effects (logging, notifications, etc.), not for overriding the result or state. What to do instead (Prefect 3.x, including 3.4.14): Recommended patterns to continue your flow with a default after all retries are exhausted: - Catch at await time (works great with your
.delay()
+
.result_async()
pattern) - Prefect will perform all configured retries; if the task still fails,
result_async()
will raise. Catch it and substitute a default. Example: single task
Copy code
fut = my_task.with_options(name=f"work-{item_id}").delay(item_id)

try:
    value = await fut.result_async()
except Exception:
    value = DEFAULT_VALUE
# continue flow with `value`
Example: many tasks concurrently with asyncio.gather
Copy code
async def result_or_default(fut, default):
    try:
        return await fut.result_async()
    except Exception:
        return default

futs = [my_task.with_options(name=f"work-{i}").delay(i) for i in items]
results = await asyncio.gather(*(result_or_default(f, DEFAULT_VALUE) for f in futs))
# `results` contains real values or defaults; flow continues
- Optional: avoid raising and branch on state - If you prefer not to raise, you can retrieve results with failure-tolerant logic and substitute defaults based on failure. The simplest, reliable approach is the try/except above. If you’re working in sync code,
future.result(raise_on_failure=False)
is available; the async variant mirrors this behavior, but the try/except pattern is the most portable with
.result_async()
. - Use
allow_failure
when you want downstream tasks to run even if an upstream task fails -
allow_failure
lets downstream work proceed despite an upstream failure. It does not change the upstream result — you still need to decide a default (e.g., by handling exceptions when you actually consume the value). - Link: State-change hooks (docs) Notes on retries: - Your task’s configured
retries
and delays apply before the final exception is raised. The try/except around
await fut.result_async()
will only trigger after all retries have been exhausted (or the task finally succeeds). If you share a brief snippet of how you’re launching/awaiting your tasks now, I can tailor the pattern directly to your code.
s
@Marvin are the state change hooks (eg: on_failure and on_completion) fully executed before a flow continues ? For instance: I await a flow run and that flow has a "on_completion" async function hook registered. Will it wait for the hook to return in the await ?
m
thought for 97.2 seconds
Yes. In Prefect 3.x, state change hooks (e.g.,
on_failure
,
on_completion
) are executed as part of the state transition and are awaited before the engine proceeds. Implications: - If you await a task or flow result (e.g.,
await fut.result_async()
for a task, or a subflow/flow future), that await will not resolve until any relevant hooks for the terminal state have finished running. - Hooks can be sync or async; async hooks are awaited. - If multiple hooks are registered for the same transition, they run sequentially in the order provided. - If a hook raises, Prefect logs the error and continues; hook errors do not change the run’s final state. So in your example, if a flow has an async
on_completion
hook, `await`ing that flow’s result will wait for the hook to complete before returning. Docs: State-change hooks
s
@Marvin can the hook be an async function ?
m
thought for 70.0 seconds
Yes — hooks can be async functions, and Prefect will await them. - Flow hooks: async
on_completion
,
on_failure
, etc. are awaited before the flow proceed/returns. - Task hooks: async
on_completion
,
on_failure
are also supported and awaited. Examples: Flow hook
Copy code
from prefect import flow, get_run_logger

async def on_flow_done(flow, flow_run, state):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Flow {flow.name} completed with state: {state.type}")

@flow(on_completion=[on_flow_done])
def my_flow():
    ...
Task hook
Copy code
from prefect import task, get_run_logger

async def on_task_fail(task, task_run, state):
    logger = get_run_logger()
    logger.warning(f"Task {task.name} failed: {state.message}")

@task(on_failure=[on_task_fail])
def do_work(x):
    ...

# or dynamically via with_options
do_work = do_work.with_options(on_failure=[on_task_fail])
Hook signatures: - Flow hooks receive
(flow, flow_run, state)
and can return
None
or an awaitable. - Task hooks receive
(task, task_run, state)
and can return
None
or an awaitable. Docs: State-change hooks