alex
08/07/2020, 3:38 PMnicholas
08/07/2020, 4:04 PMalex
08/07/2020, 5:30 PMrun_feed = RunFeed()
do_grouping = DoGrouping()
reuse_data=ReuseExistingData(trigger=triggers.any_failed)
with flow(...):
run_feed(feed)
do_grouping(feed, upstream_tasks=[run_feed])
# if any failure, recover
reuse_data(feed, upstream_tasks=[run_feed, do_grouping])
This is what my dag loogs like:, which is a bit unexpected.
Also, it looks like each method is being run twice, and fails the first time because run is missing the required arg feed
nicholas
08/07/2020, 5:37 PMRunFeed
class to each of your tasks, try passing a single reference instead like this:
do_grouping = DoGrouping()
reuse_data=ReuseExistingData(trigger=triggers.any_failed)
with flow(...):
run_feed = RunFeed()(feed)
do_grouping(feed, upstream_tasks=[run_feed])
# if any failure, recover
reuse_data(feed, upstream_tasks=[run_feed, do_grouping])
alex
08/07/2020, 5:39 PMnicholas
08/07/2020, 5:40 PMDoGrouping
task as well