https://prefect.io logo
Title
a

alex

08/07/2020, 3:38 PM
Hello, I have a bunch of tasks called sequentially in my flow. If any of them fails, I want to execute a failure_recovery task. What would the most concise way of expressing this be? Also sometimes I may want to execute failure_recovery task directly (this is specified as a param)
n

nicholas

08/07/2020, 4:04 PM
Hi @alex - I'd recommend you take a look at https://docs.prefect.io/core/concepts/execution.html#triggers, which deals with State-driven Task triggers; I believe this will handle your use case!
a

alex

08/07/2020, 5:30 PM
Thank for the reply nicholas, I checked them out but I feel like i'm doing something wrong, here is a code snippet:
run_feed = RunFeed()
do_grouping = DoGrouping()
reuse_data=ReuseExistingData(trigger=triggers.any_failed)

with flow(...):
   run_feed(feed)
   do_grouping(feed, upstream_tasks=[run_feed])

   # if any failure, recover
   reuse_data(feed, upstream_tasks=[run_feed, do_grouping])
This is what my dag loogs like:, which is a bit unexpected. Also, it looks like each method is being run twice, and fails the first time because run is missing the required arg
feed
n

nicholas

08/07/2020, 5:37 PM
@alex it looks like you're passing separate instances of your
RunFeed
class to each of your tasks, try passing a single reference instead like this:
do_grouping = DoGrouping()
reuse_data=ReuseExistingData(trigger=triggers.any_failed)

with flow(...):
   run_feed = RunFeed()(feed)
   do_grouping(feed, upstream_tasks=[run_feed])
   # if any failure, recover
   reuse_data(feed, upstream_tasks=[run_feed, do_grouping])
a

alex

08/07/2020, 5:39 PM
I see, thanks for your help! 🙂
n

nicholas

08/07/2020, 5:40 PM
yup yup! you'll want to do the same for your
DoGrouping
task as well