Thread
#prefect-community
    a

    alex

    2 years ago
    Hello everyone, I'm looking to try out prefect for one of our pipelines. We have multiple "sources" that consist of 2 "highlevel" types: base and parent sources. Parent sources have a list of children sources. We may choose to add or remove sources based on a config. Each source has a
    run
    method, calling run on a parent source calls the
    run
    method for each of the children. Currently, all the top-level sources are called sequentially and then finally aggregated, does anyone have any ideas on the most "prefectic" way to structure this to maximize parallelization and conciseness? One way is to define a task for each highlevel source, but this seems to be a bit tedious. We also won't be able to parallelize the children. Another approach I tried is something like this.
    @task
    def run_for_child(child):
        child.run(max_items=50)
        return child.collection_name
    
    @task
    def run_source(source):
        cond = source.is_parent
        with case(cond, True):
            res1 = run_for_child.map(source.children)
        with case(cond, False):
            source.run(max_items=50)
            res2 = source.collection_name
        return merge(res1, res2)
    
    @task 
    def get_highlevel_sources(config):
        # return list[Source] based off config
    
    # In the flow, get the sources and do runsource.map. At the end, aggregate all collections
    This gave me a
    could not infer active flow context
    and I'm not sure if this is the best way to structure this anyways
    Kyle Moon-Wright

    Kyle Moon-Wright

    2 years ago
    Hey @alex! I'll get some information for you.
    Hey @alex, Let me know if this is way off, just want to contextualize what you're trying to accomplish and hopefully provide some direction. I think the best way for you to organize your flow is to use two sets of mapping in the flow context rather than at the task level.
    In that case we can recontextualize your flow to look like this (courtesy of @nicholas):
    from prefect import Flow, task
    
    @task
    def run_parent_feeds(parent):
        return parent.run()
        # do other stuff
    
    @task
    def run_child_feeds(child):
        return child.run()
        # do other stuff
    
    @task
    def merge_feeds(feed1, feed2):
        return feed1 + feed2
    
    @task
    def get_parents(feed):
        return feed.is_parent
    
    @task
    def get_children(feed):
        return not feed.is_parent
    
    @task
    def get_sources(config):
        # return some mix of parents/children sources
    
    with Flow("feed flow") as flow:
        config = {"some_config": "some_value"}
        sources = get_sources(config)
    
        parents = get_parents.map(sources)
        children = get_children.map(sources)
    
        parent_feeds = run_parent_feeds.map(parents)
        child_feeds = run_child_feeds.map(children)
        merged_feeds = merge_feeds(parent_feeds, child_feeds)
    If you have more questions, let us know!
    a

    alex

    2 years ago
    thank you @Kyle Moon-Wright and @nicholas! I was able to get a similar version working perfectly. I just had another question, in the dag overview page, I lose information regarding the feed name etc. Also, ideally I would like to visualize a parent_feed splitting into the child_feeds. What would the best way to achieve this be?
    nicholas

    nicholas

    2 years ago
    @alex I think that's to be expected since the feed name would come from your code/data, no?
    a

    alex

    2 years ago
    I mean previously, when I did this for a subset of my feeds and referenced them explicitly in the flow
    @task
    def my_task():
        feed = Feed()
        feed.run(max_items=50)
        return feed.name
    I was able to see
    my_task
    -> etc. Ideally for the shared example, being able to visualize all the feeds (so I can look at their status as they might run for different times/be stuck etc.) instead of just seeing a Constant[list] would be great. 😬
    nicholas

    nicholas

    2 years ago
    Ah I see - when you run that flow, you'll see a somewhat different visualization than the schematic above, which will show a horizontal bar underneath the task name for your mapped tasks. Clicking on that task will show you the states of all your mapped task runs!
    The picture above just shows the schematic of your flow, what you registered with Prefect; since the runtime will be iterating over a dynamic list (which Prefect doesn't know about until the runs happen), it'll be more descriptive than the bare schematic.
    a

    alex

    2 years ago
    I see, is there any way to still show any information regarding mapped tasks (eg. maybe I could map over a (feed, description), or implement a
    __repr__
    method ) to show a description? For example, one of the feeds below took a bit longer than the others but it is a bit difficult to identify which one it is
    I guess based on the examples, I think
    map
    would be ideal for applying a task to primitive items like a list of numbers or filename. In my case, being able to get information on feeds at a glance would be very helpful. Would you be able to give me any guidance on how I could achieve something like that with minimal code changes? For reference, I have around 20 feeds
    nicholas

    nicholas

    2 years ago
    You're correct @alex - we have an open issue that I think gets at what you're describing here, I'd encourage you to add your use case to the issue, it helps a lot as we start to design and scope the new feature 🙂 You can find the issue here:https://github.com/PrefectHQ/prefect/issues/2100
    Pedro Machado

    Pedro Machado

    2 years ago
    I have a similar use case and added a comment on the issue above. Is there a workaround in the meantime?