Thread
#prefect-community
    Max Kolasinski

    Max Kolasinski

    10 months ago
    I’m having some trouble wrapping my head around designing a conditional flow based around Great Expectations Validation and was wondering if any one had any advice. To summarize- we have an ETL Task, after which we want to run the Validation Task. If the Validation Task sets its state as Failed we want to run a Notification Task that alerts us, but otherwise we want the subsequent tasks to continue. Where I’m getting stuck is that it seems like the routing tools that I’ve found don’t quite fit right: • Using Triggers doesn’t feel correct, because I don’t want to run my Notification Task on
    all_failed
    or
    any_failed
    - if the ETL Task fails, it shouldn’t run. What I believe I would need is something like an
    on_x_task_failed
    option- it seems like the available options are way too broad to be useful. • I then looked into some of the ideas on the Conditional Logic page, but this seems clumsy for a few reasons. I need a Task specifically to check the State of the Validation Task, and then on our Schematic View we have additional Tasks showing up for each Case Task as well as the Merge Task. All combined, it makes our Schematic look like the image below which seems crazy for what is effectively
    if x do y
    . I feel like I have to be approaching this in completely the wrong way- if anyone had any ideas or suggestions I would be extremely grateful.
    Kevin Kho

    Kevin Kho

    10 months ago
    Hey @Max Kolasinski, first off, I was just chatting with someone from the GE team and we’d love for you to show what you do with the tools together. To answer the question though, I think this can be done with a State Handler. These run as tasks change states, so now you can run some Python code when the task fails. This code can contain the Notification you want.
    Max Kolasinski

    Max Kolasinski

    10 months ago
    Thanks for the link- I’ll take a look at this State Handler and see if it’s the missing link for me. And I’d be happy to discuss our use case one we get this thing rolling. This is our initial test of GE so we’re still feeling things out at the moment 🙂
    Kevin Kho

    Kevin Kho

    10 months ago
    A common use case for the state handler is:
    def mystatehandler(task, old_state_new_state):
        if new_state.is_failed():
            SlackTask(message="test message").run()
        return new_state
    
    @task(state_handlers=[mystatehandler])
    def test():
        return 1
    The only tricky part is getting data from the task into the state handler. For example,
    @task(state_handlers=[mystatehandler])
    def test():
        x = "some variable"
        return 1
    the
    mystatehandler
    will not be able to access x
    Max Kolasinski

    Max Kolasinski

    10 months ago
    Longer-term I could see it being useful to really dig into GE’s ValidationOperatorResult to have some fine-tuned control to respond to specific validation failures but for right now this should work for us. Thanks for the advice!
    Andrew Black

    Andrew Black

    10 months ago
    @alex Adding Alex from Prefect to the thread who leads our integration engineering efforts.