Hello! I have a flow as follows: ```results = [] f...
# prefect-contributors
a
Hello! I have a flow as follows:
Copy code
results = []
for op in operations:
  op = task(op)
  most_recent = op
  
  .. a few more conditional tasks defined at compile time
 
  if do_recovery: 
     recovery_task = recover(trigger=any_failed_else_raise_skip_signal)
     most_recent = recovery_task
  results.append(most_recent)

backup_task = Backup('mydatabasename', upstream_tasks=[results], skip_on_upstream_skip=False)
mapped = Aggregate.map(
            results,
            target=unmapped('mydatabasename')
)
mapped.set_upstream([backup_task])
As you can see from the image, instead of getting one list, I have 2 lists and due to this, my Aggregate task is being skipped even though it is downstream of a
skip_on_upstream_skip
task. Is there a way to cleanly resolve this (ie. only get 1 list) ?
n
Hi @alex - can you post your full code (or a more complete min reproducible example)? Iā€™m seeing 3 references created to the same list (
op
,
most_recent
, and
result
)
a
Hi @nicholas, thanks for reaching out! Here's a minimal reproducible example of what I'm trying to achieve. I've added some more information about what I'm trying to achieve at the bottom of the script if that helps. Please let me know if there's any more information I can provide.
Copy code
import copy
import datetime

import prefect
from prefect import Flow, task
from prefect.triggers import _get_all_states_as_set


class Feed:
    def __init__(self, name, do_alpha, raise_an_err=None):
        self.name = name
        self.do_alpha = do_alpha
        self.do_bravo = True
        self.date = datetime.datetime.today().strftime("%Y%m%d")
        self.raise_an_err = raise_an_err

    def update_date(self, datenew):
        self.date = datenew

    def get_collection(self):
        return self.name + "_collection_" + self.date


@task(log_stdout=True)
def do_alpha(feed):
    print(feed.get_collection() + " did alpha")
    return feed


@task(log_stdout=True)
def do_bravo(feed):
    if feed.raise_an_err:
        raise Exception(feed.get_collection() + " could not do bravo")
    print(feed.get_collection() + " did bravo")
    return feed


@task(log_stdout=True)
def refresh_date(feed):
    feed.update_date(prefect.context.today_nodash)
    return feed


@task(log_stdout=True)
def aggregate(feed):
    print(f"aggregated {feed.get_collection()}")


@task(log_stdout=True)
def prepare_for_aggregation():
    print("prepared")


def run_on_any_failed_or_skip_trigger(upstream_states):
    """
    If any of the upstream tasks failed, run this task.
    If not, raise skip signal
    """
    if any(s.is_failed() for s in _get_all_states_as_set(upstream_states)):
        return True
    else:
        raise prefect.engine.signals.SKIP()


@task(log_stdout=True, trigger=run_on_any_failed_or_skip_trigger)
def failure_recovery(feed):
    print(f"recovered {feed.get_collection()}")
    return feed


@task(log_stdout=True)
def copy_task(o):
    return copy.deepcopy(o)


feed_a, feed_b = Feed("a", do_alpha=False), Feed("b", do_alpha=True, raise_an_err=True)
params = {
    feed_a: {"do_alpha": feed_a.do_alpha, "do_bravo": feed_a.do_bravo},
    feed_b: {"do_alpha": feed_b.do_alpha, "do_bravo": feed_b.do_bravo},
}
feeds = [feed_a, feed_b]

recovery_enabled = True
def get_flow():
    with Flow("Demo Flow") as f:
        results = []
        for feed in feeds:
            run_params = params[feed]
            original_feed = copy_task(feed)

            feed = refresh_date(feed)

            if run_params["do_alpha"]:
                feed = do_alpha(feed)

            if run_params["do_bravo"]:
                feed = do_bravo(feed)

            # some more similar logic (do_charlie etc..)

            if recovery_enabled:
                feed = failure_recovery(
                    original_feed, upstream_tasks=[feed]
                )  # cant just pass feed, as if any of above tasks failed, feed's value will be a FailedTrigger(?)

            results.append(feed)

        # wait till all feeds have ran
        prep = prepare_for_aggregation(
            upstream_tasks=[results],
            task_args=dict(
                skip_on_upstream_skip=False
            ),  # want to make sure we still run if we skipped failure recovery
        )

        agg = aggregate.map(results)
        agg.set_upstream([prep])
    return f


if __name__ == "__main__":
    flow = get_flow()
    # flow.run()
    flow.register('cf')

"""
This used to be a cli application before I transitioned to prefect.

Feeds contain information about how to run them and are defined at compile time (ie. they won't be changed through 
eg.parameters, and also provide a method used by all tasks: get_collection, which is based on the current date, 
so that needs to be injected every time a flow runs, otherwise it will still use the 
same date every day.

At the end, I want to backup my final collection and insert data from all feed collections into it.
"""
šŸ‘€ 1
n
Ah ok so - I think you got some runtime vs build time principles mixed up. For example:
Copy code
results = []
        for feed in feeds:
            run_params = params[feed]
            original_feed = copy_task(feed)
            feed = refresh_date(feed)
            if run_params["do_alpha"]:
                feed = do_alpha(feed)
            if run_params["do_bravo"]:
                feed = do_bravo(feed)
            # some more similar logic (do_charlie etc..)
            if recovery_enabled:
                feed = failure_recovery(
                    original_feed, upstream_tasks=[feed]
                )  # cant just pass feed, as if any of above tasks failed, feed's value will be a FailedTrigger(?)
            results.append(feed)
This section of your flow context is appending to
results
at build time. What this means is that it would look something like this:
Copy code
# results
[<Task: failure_recovery>, <Task: failure_recovery>]
This means that downstream tasks that try to access that that at runtime are trying to work with task objects, instead of the results of your tasks as you expect. Typically the way I think about building flows is that anything in the
with Flow() as flow:
block should read as if you're describing the structure of your application; anything in a task block should read as describing runtime logic. So you might build your flow something like this:
Copy code
from prefect import Flow, task, unmapped


@task
def get_feeds():
    return [
        {"name": "a", "do_alpha": False},
        {"name": "a", "do_alpha": False, "raise_an_err": True},
    ]


@task
def get_collection(feed, date):
    return feed.name + "_collection_" + date


@task
def get_formatted_date():
    return datetime.datetime.today().strftime("%Y%m%d")


@task(log_stdout=True)
def do_alpha(feed, collection):
    print(collection + " did alpha")
    return feed


@task(log_stdout=True)
def do_bravo(feed, collection):
    if feed.raise_an_err:
        raise Exception(collection + " could not do bravo")
    print(collection + " did bravo")
    return feed


@task
def filter_feeds(feeds, ref):
    return filter(lambda feed: feed.name == ref, feeds)


@task
def merge_feeds(*args):
    feeds = []
    for feed in args:
        feeds.extend(feed)
    return feeds


with Flow("Demo flow") as flow:
    # get the feeds we want to iterate over
    feeds = get_feeds()

    # get the formatted date
    # (note that this is also in the flow run context that you can access in a task with prefect.context)
    today = get_formatted_date()

    collections = get_collection.map(feeds, unmapped(get_formatted_date))

    # create branches from your feeds that you perform some operations on
    alpha_feeds = filter_feeds(feeds, "alpha")
    bravo_feeds = filter_feeds(feeds, "bravo")

    # merge the data sets back together
    merged_feeds = merge_feeds(alpha_feeds=alpha_feeds, bravo_feeds=bravo_feeds)

    # prepare_for_aggregation(upstream_tasks=[merged_feeds])
    # etc etc
a
Thanks for your reply, that really helped clear things up for me!
n
Glad to help! šŸ˜„