class MyFlow(prefect.core.Flow):
def __init__(flow_name, state_handlers, **kwargs):
state_handlers = [log_stdout, on_flow_start, on_flow_finish]
super().__init__(flow_name, state_handlers, **kwargs)
def on_flow_finish(flow: Flow, old_state: State, new_state: State) -> Optional[State]:
if new_state.is_finished() and get_flow_state_name(new_state) == 'Failed':
exception_trace: str = "".join(traceback.format_list(traceback.extract_tb(list(new_state.result.values())[0].result.__traceback__)))
## do something with exception_trace
While running it on prefect cloud, the
new_state.result
dict is empty resulting in a
IndexError
. When we run it locally, the new_result.result comes back with a length of the number of tasks in the flow. What's the right way to trap exceptions from a task in the flow's scope?
k
Kevin Kho
05/26/2022, 11:17 PM
The result will only be available on the task. I don’t think the Flow result is populated. So for local runs, the result is held, but in Cloud runs, results are not held in memory because they can be overly big. So Cloud runs will be different in that sense.
Kevin Kho
05/26/2022, 11:18 PM
I would do this at the task level instead of the Flow level. The result is populated there like the example you posted
gratitude thank you 1
v
Varun Srinivasan
05/27/2022, 4:54 PM
Would the task inside a flow have references to the flow? As in, would I be able to access the flow_run_id from the task?
k
Kevin Kho
05/27/2022, 4:56 PM
Yes by doing
prefect.context.flow_run_id
Kevin Kho
05/27/2022, 4:57 PM
More like there is access to the context, not the Flow object itself. You can see what is in the context here
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.