Hi.. I used <https://gist.github.com/anna-geller/2...
# prefect-community
Hi.. I used https://gist.github.com/anna-geller/2014180ee5eaec9ea54f4d3f5b98ca93 as a reference to get exception logs.
Copy code
class MyFlow(prefect.core.Flow):
    def __init__(flow_name, state_handlers, **kwargs):
        state_handlers = [log_stdout, on_flow_start, on_flow_finish]
        super().__init__(flow_name, state_handlers, **kwargs)

def on_flow_finish(flow: Flow, old_state: State, new_state: State) -> Optional[State]:
    if new_state.is_finished() and get_flow_state_name(new_state) == 'Failed':
        exception_trace: str = "".join(traceback.format_list(traceback.extract_tb(list(new_state.result.values())[0].result.__traceback__)))
        ## do something with exception_trace
While running it on prefect cloud, the
dict is empty resulting in a
. When we run it locally, the new_result.result comes back with a length of the number of tasks in the flow. What's the right way to trap exceptions from a task in the flow's scope?
The result will only be available on the task. I don’t think the Flow result is populated. So for local runs, the result is held, but in Cloud runs, results are not held in memory because they can be overly big. So Cloud runs will be different in that sense.
I would do this at the task level instead of the Flow level. The result is populated there like the example you posted
gratitude thank you 1
Would the task inside a flow have references to the flow? As in, would I be able to access the flow_run_id from the task?
Yes by doing
More like there is access to the context, not the Flow object itself. You can see what is in the context here
Perfect. Thank you!