Raviraja Ganta
06/23/2022, 9:58 AMsuccess_task
that will send a SQS message to my backend server indicating that the flow ran successfully. Now, I want to send a message when there is a failure in execution of task as well. Since the amount of tasks can be high (more than 10), is there a better to do error handling of the tasks?Sylvain Hazard
06/23/2022, 10:00 AMany_failed
to have a task that runs only if any upstream task has failed and not otherwise.Raviraja Ganta
06/23/2022, 10:03 AMSylvain Hazard
06/23/2022, 12:17 PMRaviraja Ganta
06/23/2022, 12:18 PMSylvain Hazard
06/23/2022, 12:20 PMKevin Kho
06/23/2022, 2:06 PMnew_state.result
. This is an overengineered version of that (but it’s theonly one I have ready):
from prefect import Flow, task
import prefect
class SomeError(ValueError):
def __init__(self, *args, x):
super().__init__(*args)
self.x = x
def st(task, old_state, new_state):
if new_state.is_failed():
<http://prefect.context.logger.info|prefect.context.logger.info>(type(new_state.result))
<http://prefect.context.logger.info|prefect.context.logger.info>(new_state.result.x)
return new_state
@task(state_handlers=[st])
def abc(x):
# we want to keep x in state handler
if x == 2:
raise SomeError("test error", x=x)
return x+1
with Flow("..") as flow:
abc(2)
flow.run()
any_failed
. And if you pass the upstream_task as input, it will contain the error here. Both are viable approachesRaviraja Ganta
06/24/2022, 3:29 AM