Syméon del Marmol
09/03/2025, 7:59 AMon_failure
hook to return default values ?Marvin
09/03/2025, 7:59 AMMarvin
09/03/2025, 8:06 AMon_failure
hook cannot change a task’s return value. Hooks run after the task transitions to a failed state and are for side effects (logging, notifications, etc.), not for overriding the result or state.
What to do instead (Prefect 3.x, including 3.4.14):
Recommended patterns to continue your flow with a default after all retries are exhausted:
- Catch at await time (works great with your .delay()
+ .result_async()
pattern)
- Prefect will perform all configured retries; if the task still fails, result_async()
will raise. Catch it and substitute a default.
Example: single task
fut = my_task.with_options(name=f"work-{item_id}").delay(item_id)
try:
value = await fut.result_async()
except Exception:
value = DEFAULT_VALUE
# continue flow with `value`
Example: many tasks concurrently with asyncio.gather
async def result_or_default(fut, default):
try:
return await fut.result_async()
except Exception:
return default
futs = [my_task.with_options(name=f"work-{i}").delay(i) for i in items]
results = await asyncio.gather(*(result_or_default(f, DEFAULT_VALUE) for f in futs))
# `results` contains real values or defaults; flow continues
- Optional: avoid raising and branch on state
- If you prefer not to raise, you can retrieve results with failure-tolerant logic and substitute defaults based on failure. The simplest, reliable approach is the try/except above. If you’re working in sync code, future.result(raise_on_failure=False)
is available; the async variant mirrors this behavior, but the try/except pattern is the most portable with .result_async()
.
- Use allow_failure
when you want downstream tasks to run even if an upstream task fails
- allow_failure
lets downstream work proceed despite an upstream failure. It does not change the upstream result — you still need to decide a default (e.g., by handling exceptions when you actually consume the value).
- Link: State-change hooks (docs)
Notes on retries:
- Your task’s configured retries
and delays apply before the final exception is raised. The try/except around await fut.result_async()
will only trigger after all retries have been exhausted (or the task finally succeeds).
If you share a brief snippet of how you’re launching/awaiting your tasks now, I can tailor the pattern directly to your code.Syméon del Marmol
09/03/2025, 8:31 AMMarvin
09/03/2025, 8:31 AMMarvin
09/03/2025, 8:33 AMon_failure
, on_completion
) are executed as part of the state transition and are awaited before the engine proceeds.
Implications:
- If you await a task or flow result (e.g., await fut.result_async()
for a task, or a subflow/flow future), that await will not resolve until any relevant hooks for the terminal state have finished running.
- Hooks can be sync or async; async hooks are awaited.
- If multiple hooks are registered for the same transition, they run sequentially in the order provided.
- If a hook raises, Prefect logs the error and continues; hook errors do not change the run’s final state.
So in your example, if a flow has an async on_completion
hook, `await`ing that flow’s result will wait for the hook to complete before returning.
Docs: State-change hooksSyméon del Marmol
09/03/2025, 8:38 AMMarvin
09/03/2025, 8:39 AMMarvin
09/03/2025, 8:40 AMon_completion
, on_failure
, etc. are awaited before the flow proceed/returns.
- Task hooks: async on_completion
, on_failure
are also supported and awaited.
Examples:
Flow hook
from prefect import flow, get_run_logger
async def on_flow_done(flow, flow_run, state):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Flow {flow.name} completed with state: {state.type}")
@flow(on_completion=[on_flow_done])
def my_flow():
...
Task hook
from prefect import task, get_run_logger
async def on_task_fail(task, task_run, state):
logger = get_run_logger()
logger.warning(f"Task {task.name} failed: {state.message}")
@task(on_failure=[on_task_fail])
def do_work(x):
...
# or dynamically via with_options
do_work = do_work.with_options(on_failure=[on_task_fail])
Hook signatures:
- Flow hooks receive (flow, flow_run, state)
and can return None
or an awaitable.
- Task hooks receive (task, task_run, state)
and can return None
or an awaitable.
Docs: State-change hooks