https://prefect.io logo
p

Peter Roelants

01/28/2021, 3:14 PM
Hi Prefect, What's the recommended way of dealing with Task failures? For example, I want to write to a db table or send a message to Kafka if a certain task has failed. From the documentation it seems that there are two ways of handling failure of the task: • Implement a State Handler to deal with the failed state. • Implement a Task to deal with the failed stage and use a Trigger with
any_failed
? I'm not sure which is the most idiomatic way of dealing with this failure, since updating a database or sending a message to Kafka itself is prone to failure (and "negative engineering" to deal with that failure). Does Prefect have an opinion/guidelines how to deal with this?
Related question: What defines an "upstream task" in the trigger? Is this all direct parents? Or does it also contains the parents of the parents and so on... Looking at the definition of Edge it seems that its the former (upstream task is only the direct parents that have a direct connection to the task)?
z

Zanie

01/28/2021, 4:54 PM
Hi Peter — I believe upstream tasks for triggers are immediate parents only. I’d recommend just using the
on_failure
kwarg for
Task
, see https://docs.prefect.io/api/latest/core/task.html#task-2
With that, you define a function
message_kafka(failed_task, failed_state) -> None
that will be called on failure of a specific task.
If you want to attach something at the Flow level, then I recommend using a state handler. You can then check if the state is a failure and if the task is an instance of one of the task you care about.
p

Peter Roelants

01/28/2021, 5:31 PM
I didn't notice the
on_failure
kwarg before, thanks for pointing it out.
Is there any way to get the input of the failed task from the state?
z

Zanie

01/28/2021, 6:24 PM
Ah that’s a great question — it might be in the context? Otherwise you can capture the exception and raise your own failure state with the task inputs.
p

Peter Roelants

01/28/2021, 6:37 PM
I've been trying to look for it, It doesn't seem to be in the context, nor in the
old_state
's results 😞 The only other solution to get the inputs of the failed task indeed follows your suggestion: to add the task inputs to the failed state manually, similar as in this example using signals. However, I feel like capturing the exception and re-raising them with the tasks's input is defeating a bit of Prefect's value proposition to reduce "negative engineering". Could I make a feature request somewhere to include meta-data like the failed task's input to the Failure state?
z

Zanie

01/28/2021, 6:41 PM
This feels like a reasonable request but you do need to consider that attaching data to a state shares it with Prefect Cloud which is a big no-no in the hybrid model. There’s probably a way around this though — I suggest you open an issue in the
prefect
repo so we can discuss the best way to solve this.
p

Peter Roelants

01/28/2021, 6:46 PM
Thanks, I'll think about it a bit more and try to write something up for a feature request.
It's good that you mention that because I don't want the data to leak into the cloud either.
z

Zanie

01/28/2021, 6:48 PM
You could try placing the inputs into the context at the start of your task — I’d have to look around a bunch to see how long that persists
p

Peter Roelants

01/28/2021, 7:00 PM
Looking at the context documentation it seems like they will disapear after the task exists and leaves the context manager to set the inputs.
z

Zanie

01/28/2021, 7:07 PM
Yeah I think you’d be best served with a feature request
“Expose task inputs to state handlers”
p

Peter Roelants

02/01/2021, 9:20 AM
This question also got me thinking why state handlers are not Tasks themselves. I wrote up a short proposal to change this at https://github.com/PrefectHQ/prefect/discussions/4040