Thread
#prefect-community
    a

    alex

    2 years ago
    Hello, I have a bunch of tasks called sequentially in my flow. If any of them fails, I want to execute a failure_recovery task. What would the most concise way of expressing this be? Also sometimes I may want to execute failure_recovery task directly (this is specified as a param)
    nicholas

    nicholas

    2 years ago
    Hi @alex - I'd recommend you take a look at https://docs.prefect.io/core/concepts/execution.html#triggers, which deals with State-driven Task triggers; I believe this will handle your use case!
    a

    alex

    2 years ago
    Thank for the reply nicholas, I checked them out but I feel like i'm doing something wrong, here is a code snippet:
    run_feed = RunFeed()
    do_grouping = DoGrouping()
    reuse_data=ReuseExistingData(trigger=triggers.any_failed)
    
    with flow(...):
       run_feed(feed)
       do_grouping(feed, upstream_tasks=[run_feed])
    
       # if any failure, recover
       reuse_data(feed, upstream_tasks=[run_feed, do_grouping])
    This is what my dag loogs like:, which is a bit unexpected. Also, it looks like each method is being run twice, and fails the first time because run is missing the required arg
    feed
    nicholas

    nicholas

    2 years ago
    @alex it looks like you're passing separate instances of your
    RunFeed
    class to each of your tasks, try passing a single reference instead like this:
    do_grouping = DoGrouping()
    reuse_data=ReuseExistingData(trigger=triggers.any_failed)
    
    with flow(...):
       run_feed = RunFeed()(feed)
       do_grouping(feed, upstream_tasks=[run_feed])
       # if any failure, recover
       reuse_data(feed, upstream_tasks=[run_feed, do_grouping])
    a

    alex

    2 years ago
    I see, thanks for your help! 🙂
    nicholas

    nicholas

    2 years ago
    yup yup! you'll want to do the same for your
    DoGrouping
    task as well