Varun Srinivasan
05/26/2022, 11:15 PMclass MyFlow(prefect.core.Flow):
def __init__(flow_name, state_handlers, **kwargs):
state_handlers = [log_stdout, on_flow_start, on_flow_finish]
super().__init__(flow_name, state_handlers, **kwargs)
def on_flow_finish(flow: Flow, old_state: State, new_state: State) -> Optional[State]:
if new_state.is_finished() and get_flow_state_name(new_state) == 'Failed':
exception_trace: str = "".join(traceback.format_list(traceback.extract_tb(list(new_state.result.values())[0].result.__traceback__)))
## do something with exception_trace
While running it on prefect cloud, the new_state.result
dict is empty resulting in a IndexError
. When we run it locally, the new_result.result comes back with a length of the number of tasks in the flow. What's the right way to trap exceptions from a task in the flow's scope?Kevin Kho
Varun Srinivasan
05/27/2022, 4:54 PMKevin Kho
prefect.context.flow_run_id
Varun Srinivasan
05/27/2022, 5:02 PM