I’m having some trouble wrapping my head around de...
# ask-community
m
I’m having some trouble wrapping my head around designing a conditional flow based around Great Expectations Validation and was wondering if any one had any advice. To summarize- we have an ETL Task, after which we want to run the Validation Task. If the Validation Task sets its state as Failed we want to run a Notification Task that alerts us, but otherwise we want the subsequent tasks to continue. Where I’m getting stuck is that it seems like the routing tools that I’ve found don’t quite fit right: • Using Triggers doesn’t feel correct, because I don’t want to run my Notification Task on
all_failed
or
any_failed
- if the ETL Task fails, it shouldn’t run. What I believe I would need is something like an
on_x_task_failed
option- it seems like the available options are way too broad to be useful. • I then looked into some of the ideas on the Conditional Logic page, but this seems clumsy for a few reasons. I need a Task specifically to check the State of the Validation Task, and then on our Schematic View we have additional Tasks showing up for each Case Task as well as the Merge Task. All combined, it makes our Schematic look like the image below which seems crazy for what is effectively
if x do y
. I feel like I have to be approaching this in completely the wrong way- if anyone had any ideas or suggestions I would be extremely grateful.
k
Hey @Max Kolasinski, first off, I was just chatting with someone from the GE team and we’d love for you to show what you do with the tools together. To answer the question though, I think this can be done with a State Handler. These run as tasks change states, so now you can run some Python code when the task fails. This code can contain the Notification you want.
m
Thanks for the link- I’ll take a look at this State Handler and see if it’s the missing link for me. And I’d be happy to discuss our use case one we get this thing rolling. This is our initial test of GE so we’re still feeling things out at the moment 🙂
k
A common use case for the state handler is:
Copy code
def mystatehandler(task, old_state_new_state):
    if new_state.is_failed():
        SlackTask(message="test message").run()
    return new_state

@task(state_handlers=[mystatehandler])
def test():
    return 1
The only tricky part is getting data from the task into the state handler. For example,
Copy code
@task(state_handlers=[mystatehandler])
def test():
    x = "some variable"
    return 1
the
mystatehandler
will not be able to access x
m
Longer-term I could see it being useful to really dig into GE’s ValidationOperatorResult to have some fine-tuned control to respond to specific validation failures but for right now this should work for us. Thanks for the advice!
a
@alex Adding Alex from Prefect to the thread who leads our integration engineering efforts.