https://prefect.io logo
Title
a

alex

08/07/2020, 9:17 PM
Hello everyone, I would really appreciate some advice on how to structure a flow that applies various different tasks to each item in a list. Here is a snippet
feeds = get_feeds()

with Flow(..):
   for feed in feeds:
     run_feed = GetDataTask(name=f"GetData: {feed.feed_name}")(feed)
     latest = run_feed

     if feed.do_transform():
        transform_feed = TransformTask()(feed, upstream_tasks=[latest]))
        latest = transform_feed
     
     # ... some more optional logic here ...

     failure_recovery = FailureRecoveryTask(trigger=any_failed)(feed, upstream_tasks=[latest])  # should handle any failure in any of above tasks

    
   mapped = feeds.map(
                all_feeds,
                target=unmapped("aggregated"),
            )
   mapped.set_upstream([failure_recovery])
This structure isn't giving me the dag i'm looking for and I was wondering if anyone could give any advice on the most "prefect" way to do this. Some questions I had: • Should I initialize tasks before the flow, as the docs advise or is this structure ok? • Is the if and
latest=
logic advisable? Or should I use run and skip option transformations and set a "skipped" state? • How should I specify the aggregation task? Right now, the map task seems to only have a dependency on the last feed's
failure_recovery_task
.
j

Jeremiah

08/08/2020, 1:29 AM
Hi Alex, nothing jumps out to me as odd about this structure - it’s helpful to think of your flow generation script as pure Python and not worry too much about Prefect semantics if you don’t have to. Things like where you initialize don’t really matter (we do it earlier in our docs just because the double call of
Task()(args)
looks weird to folks new to Prefect). There’s also nothing wrong with your
latest
logic, though the question of whether to use
latest
in your flow or a
skipped
state is one of compile-time vs run-time semantics. I think you could do either (though I may be misunderstanding your use case) - and it’s more a preference of where you want the control flow to live. If possible, I recommend keeping it as-is because the Python script will be more clear. Last, for the aggregation task, your mapped task only depends on the last task because
mapped.set_upstream([failure_recovery])
is only called once on the last
failure_recovery
task. To have it depend on all of them, collect all the failure recovery tasks into a list and call
mapped.set_upstream(list_of_failure_recoveries)
a

alex

08/10/2020, 5:39 PM
Thanks for your insights !
👍 1