dammy arinde
01/24/2022, 3:13 PMalex
01/24/2022, 3:23 PMKevin Kho
.result
on the FAIL
exception.
So if you are using a state handler, you can do:
def state_handler(obj: Union[Task, Flow], old_state: State, new_state: State) -> Optional[State]:
res = new_state.result
...
and then attach this to your task. And then you then put the logic to update Snowflake here with the result.
If you are using a Task, you could use a downstream task to handle it:
def snowflake_task(ge_output):
if isinstance(ge_output, FAIL):
res = ge_output.result
If you are using a downstream task, you need to set trigger=always_run
dammy arinde
01/24/2022, 4:03 PMdammy arinde
01/24/2022, 4:05 PM