Hello, I have a bunch of tasks called sequentially...
# prefect-community
a
Hello, I have a bunch of tasks called sequentially in my flow. If any of them fails, I want to execute a failure_recovery task. What would the most concise way of expressing this be? Also sometimes I may want to execute failure_recovery task directly (this is specified as a param)
n
Hi @alex - I'd recommend you take a look at https://docs.prefect.io/core/concepts/execution.html#triggers, which deals with State-driven Task triggers; I believe this will handle your use case!
a
Thank for the reply nicholas, I checked them out but I feel like i'm doing something wrong, here is a code snippet:
Copy code
run_feed = RunFeed()
do_grouping = DoGrouping()
reuse_data=ReuseExistingData(trigger=triggers.any_failed)

with flow(...):
   run_feed(feed)
   do_grouping(feed, upstream_tasks=[run_feed])

   # if any failure, recover
   reuse_data(feed, upstream_tasks=[run_feed, do_grouping])
This is what my dag loogs like:, which is a bit unexpected. Also, it looks like each method is being run twice, and fails the first time because run is missing the required arg
feed
n
@alex it looks like you're passing separate instances of your
RunFeed
class to each of your tasks, try passing a single reference instead like this:
Copy code
do_grouping = DoGrouping()
reuse_data=ReuseExistingData(trigger=triggers.any_failed)

with flow(...):
   run_feed = RunFeed()(feed)
   do_grouping(feed, upstream_tasks=[run_feed])
   # if any failure, recover
   reuse_data(feed, upstream_tasks=[run_feed, do_grouping])
a
I see, thanks for your help! 🙂
n
yup yup! you'll want to do the same for your
DoGrouping
task as well