alex
08/07/2020, 9:17 PMfeeds = get_feeds()
with Flow(..):
for feed in feeds:
run_feed = GetDataTask(name=f"GetData: {feed.feed_name}")(feed)
latest = run_feed
if feed.do_transform():
transform_feed = TransformTask()(feed, upstream_tasks=[latest]))
latest = transform_feed
# ... some more optional logic here ...
failure_recovery = FailureRecoveryTask(trigger=any_failed)(feed, upstream_tasks=[latest]) # should handle any failure in any of above tasks
mapped = feeds.map(
all_feeds,
target=unmapped("aggregated"),
)
mapped.set_upstream([failure_recovery])
This structure isn't giving me the dag i'm looking for and I was wondering if anyone could give any advice on the most "prefect" way to do this. Some questions I had:
• Should I initialize tasks before the flow, as the docs advise or is this structure ok?
• Is the if and latest=
logic advisable? Or should I use run and skip option transformations and set a "skipped" state?
• How should I specify the aggregation task? Right now, the map task seems to only have a dependency on the last feed's failure_recovery_task
.Jeremiah
08/08/2020, 1:29 AMTask()(args)
looks weird to folks new to Prefect). There’s also nothing wrong with your latest
logic, though the question of whether to use latest
in your flow or a skipped
state is one of compile-time vs run-time semantics. I think you could do either (though I may be misunderstanding your use case) - and it’s more a preference of where you want the control flow to live. If possible, I recommend keeping it as-is because the Python script will be more clear.
Last, for the aggregation task, your mapped task only depends on the last task because mapped.set_upstream([failure_recovery])
is only called once on the last failure_recovery
task. To have it depend on all of them, collect all the failure recovery tasks into a list and call mapped.set_upstream(list_of_failure_recoveries)
alex
08/10/2020, 5:39 PM