https://prefect.io logo
Title
k

Ken Nguyen

06/13/2022, 11:15 PM
Is there a way to make a state handler do something different if a specific task fails? For example, I have a chain of tasks that depend on each other:
a -> b ->c
I want it so that if
a
and
b
fails, it sends a Slack notifications saying “Flow failed”. But if
c
fails, I want it alter the Slack message by adding b’s output
k

Kevin Kho

06/13/2022, 11:16 PM
I think what you want is for C to just have it’s own state handler?
Or this is much easier expressed as a SlackTask with
b
as an input and
c
as an upstream with trigger
any_failed
k

Ken Nguyen

06/13/2022, 11:28 PM
I agree, I think the latter option sounds easiest. But if I wanted to have a separate state handler for a specific task, how would I be able to do that?
k

Kevin Kho

06/13/2022, 11:30 PM
You…write a new function? Not seeing why it would be different than the first? You specify it anyway at the task level right?
@task(state_handers=[...])
k

Ken Nguyen

06/13/2022, 11:43 PM
Oh! This whole time I’ve only been setting state handler at the Flow level
Thanks for the tip!
k

Kevin Kho

06/13/2022, 11:45 PM
Oh lol!
k

Ken Nguyen

06/14/2022, 12:56 AM
Is there a way I can pass on an output of a task into the state handler?
Like if a task fail, I want the state handler to include the value of a variable within the task
k

Kevin Kho

06/14/2022, 12:57 AM
Let me draft an example
❤️ 1
Did you get your swag btw?
k

Ken Nguyen

06/14/2022, 12:57 AM
Still waiting on it 😭
k

Kevin Kho

06/14/2022, 12:58 AM
What!? Let me check on that too
It’s not pretty though because you basically need to embed it in the result
from prefect import Flow, task
import prefect

class SomeError(ValueError):
    def __init__(self, *args, x):
        super().__init__(*args)
        self.x = x

def st(task, old_state, new_state):
    if new_state.is_failed():
        <http://prefect.context.logger.info|prefect.context.logger.info>(type(new_state.result))
        <http://prefect.context.logger.info|prefect.context.logger.info>(new_state.result.x)
    return new_state

@task(state_handlers=[st])
def abc(x):
    # we want to keep x in state handler
    if x == 2:
        raise SomeError("test error", x=x)
    return x+1

with Flow("..") as flow:
    abc(2)

flow.run()
Ah ok I see order is processing on the swag
k

Ken Nguyen

06/14/2022, 1:07 AM
as long as its not lost in the mail, I am happy 😃
k

Kevin Kho

06/14/2022, 1:10 AM
You don’t have to do these code shenanigans if you use a downstream task
k

Ken Nguyen

06/14/2022, 1:38 AM
Yeah youre right
I have a task that outputs a string, I only want it to trigger a downstream task if that string has length > 0. If the downstream task is triggered, I want it to print the string from the upstream task I currently have it as
if len(str) > 0: 
  raise FAIL 
return str
But it seems like when I raise FAIL, the task returns
None
instead
How do I return an output as well as raise FAIL signal to trigger the downstream task? Or better yet, is there a simpler way I could be doing this?
k

Kevin Kho

06/14/2022, 1:42 AM
SKIP
propagates to downstream (and FAIL does too so the downstream shouldn’t run)
k

Ken Nguyen

06/14/2022, 1:43 AM
Oh I forgot to mention my downstream task has trigger=any_failed
So you’re suggesting I should use
raise SKIP
instead?
k

Kevin Kho

06/14/2022, 1:50 AM
That might be an option for this use case
k

Ken Nguyen

06/14/2022, 1:53 AM
Would a SKIP trigger a downstream flow if the downstream flow has trigger=any_failed?
I also am still unclear on whether using SKIP instead of FAIL would allow the function to return a value
Oh actually I think I might be misunderstanding you, lemme try some stuff out
k

Kevin Kho

06/14/2022, 2:19 AM
Ah SKIP is successful sorry I had it mixed up in my head. I think you are trying to do B -> C -> any_failed_task where some value is passed from B to C to the any_failed_task. I am suggesting to make B an input into the any_failed task
k

Ken Nguyen

06/14/2022, 2:21 AM
Yep that makes sense, thanks!