Jocelyn Boullier
05/20/2021, 3:43 PMKevin Kho
terminal_state_handlers
that can be attached to the Flow. This will be called when the flow ends and you can get the failed tasks from it.Kevin Kho
from prefect import task, Flow
from prefect.engine.state import State, TriggerFailed
from typing import Set, Optional
def custom_terminal_state_handler(
flow: Flow,
state: State,
reference_task_states: Set[State],
) -> Optional[State]:
if state.is_failed():
for task, task_state in state.result.items():
if not task_state.is_failed():
continue
if isinstance(task_state, TriggerFailed):
print(f"Task {task.name} failed because an upstream failed")
continue
print(f"Task {task.name} failed with exception {task_state.result!r}")
return state
@task
def abc(x):
return x
@task
def bcd(x):
raise ValueError("Foo!")
@task
def cde(x):
return x
with Flow(
"terminal_handler", terminal_state_handler=custom_terminal_state_handler
) as flow:
a = abc(1)
b = bcd(a)
c = cde(b)
flow.run()
Kevin Kho
Jocelyn Boullier
05/20/2021, 3:57 PMJocelyn Boullier
05/21/2021, 4:21 PMstate.result.items()
is properly populated. But when running on prod, in a k8s job, state.result
is an empty dict. reference_task_states
isn't, but I don't know how to correlate the result with the actual task (like with state.result.items()
.Jocelyn Boullier
05/21/2021, 4:21 PMKubernetesRun
Kevin Kho
Kevin Kho
Jocelyn Boullier
05/24/2021, 1:52 PMJocelyn Boullier
05/24/2021, 1:52 PM