alex
08/07/2020, 3:38 PMnicholas
alex
08/07/2020, 5:30 PMrun_feed = RunFeed()
do_grouping = DoGrouping()
reuse_data=ReuseExistingData(trigger=triggers.any_failed)
with flow(...):
run_feed(feed)
do_grouping(feed, upstream_tasks=[run_feed])
# if any failure, recover
reuse_data(feed, upstream_tasks=[run_feed, do_grouping])
This is what my dag loogs like:, which is a bit unexpected.
Also, it looks like each method is being run twice, and fails the first time because run is missing the required arg feed
nicholas
RunFeed
class to each of your tasks, try passing a single reference instead like this:
do_grouping = DoGrouping()
reuse_data=ReuseExistingData(trigger=triggers.any_failed)
with flow(...):
run_feed = RunFeed()(feed)
do_grouping(feed, upstream_tasks=[run_feed])
# if any failure, recover
reuse_data(feed, upstream_tasks=[run_feed, do_grouping])
alex
08/07/2020, 5:39 PMnicholas
DoGrouping
task as well