https://prefect.io logo
#prefect-community
Title
# prefect-community
r

Raviraja Ganta

06/23/2022, 9:58 AM
I have multiple tasks in a flow. Upon successful execution of all the main tasks, I am triggering a tasks called
success_task
that will send a SQS message to my backend server indicating that the flow ran successfully. Now, I want to send a message when there is a failure in execution of task as well. Since the amount of tasks can be high (more than 10), is there a better to do error handling of the tasks?
s

Sylvain Hazard

06/23/2022, 10:00 AM
If I'm understanding your use case correctly, you should be able to use Triggers, likely
any_failed
to have a task that runs only if any upstream task has failed and not otherwise.
r

Raviraja Ganta

06/23/2022, 10:03 AM
Yeah I am thinking of using the same. I want to send the error message of failed task to my backend. So that I will know what task in a flow failed and why it got failed. How to get the error of the failed task?
s

Sylvain Hazard

06/23/2022, 12:17 PM
Not sure how you could do that unfortunately 😞
r

Raviraja Ganta

06/23/2022, 12:18 PM
Yep.. that's why seeking support
s

Sylvain Hazard

06/23/2022, 12:20 PM
You could try State Handlers instead, seems like they would work
Don't know if you can access logs or error messages with them though.
k

Kevin Kho

06/23/2022, 2:06 PM
If you need to preserve traceback, downstream task is easier. There is a way with state handler though. You can access the error message with
new_state.result
. This is an overengineered version of that (but it’s theonly one I have ready):
Copy code
from prefect import Flow, task
import prefect

class SomeError(ValueError):
    def __init__(self, *args, x):
        super().__init__(*args)
        self.x = x

def st(task, old_state, new_state):
    if new_state.is_failed():
        <http://prefect.context.logger.info|prefect.context.logger.info>(type(new_state.result))
        <http://prefect.context.logger.info|prefect.context.logger.info>(new_state.result.x)
    return new_state

@task(state_handlers=[st])
def abc(x):
    # we want to keep x in state handler
    if x == 2:
        raise SomeError("test error", x=x)
    return x+1

with Flow("..") as flow:
    abc(2)

flow.run()
You can also just map the downstream task with trigger
any_failed
. And if you pass the upstream_task as input, it will contain the error here. Both are viable approaches
r

Raviraja Ganta

06/24/2022, 3:29 AM
Thanks Kevin. Will try it
15 Views