Thread
#prefect-contributors
    a

    alex

    2 years ago
    Hello! I have a flow as follows:
    results = []
    for op in operations:
      op = task(op)
      most_recent = op
      
      .. a few more conditional tasks defined at compile time
     
      if do_recovery: 
         recovery_task = recover(trigger=any_failed_else_raise_skip_signal)
         most_recent = recovery_task
      results.append(most_recent)
    
    backup_task = Backup('mydatabasename', upstream_tasks=[results], skip_on_upstream_skip=False)
    mapped = Aggregate.map(
                results,
                target=unmapped('mydatabasename')
    )
    mapped.set_upstream([backup_task])
    As you can see from the image, instead of getting one list, I have 2 lists and due to this, my Aggregate task is being skipped even though it is downstream of a
    skip_on_upstream_skip
    task. Is there a way to cleanly resolve this (ie. only get 1 list) ?
    nicholas

    nicholas

    2 years ago
    Hi @alex - can you post your full code (or a more complete min reproducible example)? Iā€™m seeing 3 references created to the same list (
    op
    ,
    most_recent
    , and
    result
    )
    a

    alex

    2 years ago
    Hi @nicholas, thanks for reaching out! Here's a minimal reproducible example of what I'm trying to achieve. I've added some more information about what I'm trying to achieve at the bottom of the script if that helps. Please let me know if there's any more information I can provide.
    import copy
    import datetime
    
    import prefect
    from prefect import Flow, task
    from prefect.triggers import _get_all_states_as_set
    
    
    class Feed:
        def __init__(self, name, do_alpha, raise_an_err=None):
            self.name = name
            self.do_alpha = do_alpha
            self.do_bravo = True
            self.date = datetime.datetime.today().strftime("%Y%m%d")
            self.raise_an_err = raise_an_err
    
        def update_date(self, datenew):
            self.date = datenew
    
        def get_collection(self):
            return self.name + "_collection_" + self.date
    
    
    @task(log_stdout=True)
    def do_alpha(feed):
        print(feed.get_collection() + " did alpha")
        return feed
    
    
    @task(log_stdout=True)
    def do_bravo(feed):
        if feed.raise_an_err:
            raise Exception(feed.get_collection() + " could not do bravo")
        print(feed.get_collection() + " did bravo")
        return feed
    
    
    @task(log_stdout=True)
    def refresh_date(feed):
        feed.update_date(prefect.context.today_nodash)
        return feed
    
    
    @task(log_stdout=True)
    def aggregate(feed):
        print(f"aggregated {feed.get_collection()}")
    
    
    @task(log_stdout=True)
    def prepare_for_aggregation():
        print("prepared")
    
    
    def run_on_any_failed_or_skip_trigger(upstream_states):
        """
        If any of the upstream tasks failed, run this task.
        If not, raise skip signal
        """
        if any(s.is_failed() for s in _get_all_states_as_set(upstream_states)):
            return True
        else:
            raise prefect.engine.signals.SKIP()
    
    
    @task(log_stdout=True, trigger=run_on_any_failed_or_skip_trigger)
    def failure_recovery(feed):
        print(f"recovered {feed.get_collection()}")
        return feed
    
    
    @task(log_stdout=True)
    def copy_task(o):
        return copy.deepcopy(o)
    
    
    feed_a, feed_b = Feed("a", do_alpha=False), Feed("b", do_alpha=True, raise_an_err=True)
    params = {
        feed_a: {"do_alpha": feed_a.do_alpha, "do_bravo": feed_a.do_bravo},
        feed_b: {"do_alpha": feed_b.do_alpha, "do_bravo": feed_b.do_bravo},
    }
    feeds = [feed_a, feed_b]
    
    recovery_enabled = True
    def get_flow():
        with Flow("Demo Flow") as f:
            results = []
            for feed in feeds:
                run_params = params[feed]
                original_feed = copy_task(feed)
    
                feed = refresh_date(feed)
    
                if run_params["do_alpha"]:
                    feed = do_alpha(feed)
    
                if run_params["do_bravo"]:
                    feed = do_bravo(feed)
    
                # some more similar logic (do_charlie etc..)
    
                if recovery_enabled:
                    feed = failure_recovery(
                        original_feed, upstream_tasks=[feed]
                    )  # cant just pass feed, as if any of above tasks failed, feed's value will be a FailedTrigger(?)
    
                results.append(feed)
    
            # wait till all feeds have ran
            prep = prepare_for_aggregation(
                upstream_tasks=[results],
                task_args=dict(
                    skip_on_upstream_skip=False
                ),  # want to make sure we still run if we skipped failure recovery
            )
    
            agg = aggregate.map(results)
            agg.set_upstream([prep])
        return f
    
    
    if __name__ == "__main__":
        flow = get_flow()
        # flow.run()
        flow.register('cf')
    
    """
    This used to be a cli application before I transitioned to prefect.
    
    Feeds contain information about how to run them and are defined at compile time (ie. they won't be changed through 
    eg.parameters, and also provide a method used by all tasks: get_collection, which is based on the current date, 
    so that needs to be injected every time a flow runs, otherwise it will still use the 
    same date every day.
    
    At the end, I want to backup my final collection and insert data from all feed collections into it.
    """
    nicholas

    nicholas

    2 years ago
    Ah ok so - I think you got some runtime vs build time principles mixed up. For example:
    results = []
            for feed in feeds:
                run_params = params[feed]
                original_feed = copy_task(feed)
                feed = refresh_date(feed)
                if run_params["do_alpha"]:
                    feed = do_alpha(feed)
                if run_params["do_bravo"]:
                    feed = do_bravo(feed)
                # some more similar logic (do_charlie etc..)
                if recovery_enabled:
                    feed = failure_recovery(
                        original_feed, upstream_tasks=[feed]
                    )  # cant just pass feed, as if any of above tasks failed, feed's value will be a FailedTrigger(?)
                results.append(feed)
    This section of your flow context is appending to
    results
    at build time. What this means is that it would look something like this:
    # results
    [<Task: failure_recovery>, <Task: failure_recovery>]
    This means that downstream tasks that try to access that that at runtime are trying to work with task objects, instead of the results of your tasks as you expect. Typically the way I think about building flows is that anything in the
    with Flow() as flow:
    block should read as if you're describing the structure of your application; anything in a task block should read as describing runtime logic. So you might build your flow something like this:
    from prefect import Flow, task, unmapped
    
    
    @task
    def get_feeds():
        return [
            {"name": "a", "do_alpha": False},
            {"name": "a", "do_alpha": False, "raise_an_err": True},
        ]
    
    
    @task
    def get_collection(feed, date):
        return feed.name + "_collection_" + date
    
    
    @task
    def get_formatted_date():
        return datetime.datetime.today().strftime("%Y%m%d")
    
    
    @task(log_stdout=True)
    def do_alpha(feed, collection):
        print(collection + " did alpha")
        return feed
    
    
    @task(log_stdout=True)
    def do_bravo(feed, collection):
        if feed.raise_an_err:
            raise Exception(collection + " could not do bravo")
        print(collection + " did bravo")
        return feed
    
    
    @task
    def filter_feeds(feeds, ref):
        return filter(lambda feed: feed.name == ref, feeds)
    
    
    @task
    def merge_feeds(*args):
        feeds = []
        for feed in args:
            feeds.extend(feed)
        return feeds
    
    
    with Flow("Demo flow") as flow:
        # get the feeds we want to iterate over
        feeds = get_feeds()
    
        # get the formatted date
        # (note that this is also in the flow run context that you can access in a task with prefect.context)
        today = get_formatted_date()
    
        collections = get_collection.map(feeds, unmapped(get_formatted_date))
    
        # create branches from your feeds that you perform some operations on
        alpha_feeds = filter_feeds(feeds, "alpha")
        bravo_feeds = filter_feeds(feeds, "bravo")
    
        # merge the data sets back together
        merged_feeds = merge_feeds(alpha_feeds=alpha_feeds, bravo_feeds=bravo_feeds)
    
        # prepare_for_aggregation(upstream_tasks=[merged_feeds])
        # etc etc
    a

    alex

    2 years ago
    Thanks for your reply, that really helped clear things up for me!
    nicholas

    nicholas

    2 years ago
    Glad to help! šŸ˜„