https://prefect.io logo
a

alex

07/06/2020, 11:24 PM
Hello everyone, I'm looking to try out prefect for one of our pipelines. We have multiple "sources" that consist of 2 "highlevel" types: base and parent sources. Parent sources have a list of children sources. We may choose to add or remove sources based on a config. Each source has a
run
method, calling run on a parent source calls the
run
method for each of the children. Currently, all the top-level sources are called sequentially and then finally aggregated, does anyone have any ideas on the most "prefectic" way to structure this to maximize parallelization and conciseness? One way is to define a task for each highlevel source, but this seems to be a bit tedious. We also won't be able to parallelize the children. Another approach I tried is something like this.
Copy code
@task
def run_for_child(child):
    child.run(max_items=50)
    return child.collection_name

@task
def run_source(source):
    cond = source.is_parent
    with case(cond, True):
        res1 = run_for_child.map(source.children)
    with case(cond, False):
        source.run(max_items=50)
        res2 = source.collection_name
    return merge(res1, res2)

@task 
def get_highlevel_sources(config):
    # return list[Source] based off config

# In the flow, get the sources and do runsource.map. At the end, aggregate all collections
This gave me a
could not infer active flow context
and I'm not sure if this is the best way to structure this anyways
k

Kyle Moon-Wright

07/06/2020, 11:49 PM
Hey @alex! I'll get some information for you.
Hey @alex, Let me know if this is way off, just want to contextualize what you're trying to accomplish and hopefully provide some direction. I think the best way for you to organize your flow is to use two sets of mapping in the flow context rather than at the task level.
In that case we can recontextualize your flow to look like this (courtesy of @nicholas):
Copy code
from prefect import Flow, task

@task
def run_parent_feeds(parent):
    return parent.run()
    # do other stuff

@task
def run_child_feeds(child):
    return child.run()
    # do other stuff

@task
def merge_feeds(feed1, feed2):
    return feed1 + feed2

@task
def get_parents(feed):
    return feed.is_parent

@task
def get_children(feed):
    return not feed.is_parent

@task
def get_sources(config):
    # return some mix of parents/children sources

with Flow("feed flow") as flow:
    config = {"some_config": "some_value"}
    sources = get_sources(config)

    parents = get_parents.map(sources)
    children = get_children.map(sources)

    parent_feeds = run_parent_feeds.map(parents)
    child_feeds = run_child_feeds.map(children)
    merged_feeds = merge_feeds(parent_feeds, child_feeds)
upvote 1
If you have more questions, let us know!
a

alex

07/07/2020, 12:45 AM
thank you @Kyle Moon-Wright and @nicholas! I was able to get a similar version working perfectly. I just had another question, in the dag overview page, I lose information regarding the feed name etc. Also, ideally I would like to visualize a parent_feed splitting into the child_feeds. What would the best way to achieve this be?
🚀 1
n

nicholas

07/07/2020, 12:46 AM
@alex I think that's to be expected since the feed name would come from your code/data, no?
a

alex

07/07/2020, 12:51 AM
I mean previously, when I did this for a subset of my feeds and referenced them explicitly in the flow
Copy code
@task
def my_task():
    feed = Feed()
    feed.run(max_items=50)
    return feed.name
I was able to see
my_task
-> etc. Ideally for the shared example, being able to visualize all the feeds (so I can look at their status as they might run for different times/be stuck etc.) instead of just seeing a Constant[list] would be great. 😬
n

nicholas

07/07/2020, 12:55 AM
Ah I see - when you run that flow, you'll see a somewhat different visualization than the schematic above, which will show a horizontal bar underneath the task name for your mapped tasks. Clicking on that task will show you the states of all your mapped task runs!
The picture above just shows the schematic of your flow, what you registered with Prefect; since the runtime will be iterating over a dynamic list (which Prefect doesn't know about until the runs happen), it'll be more descriptive than the bare schematic.
a

alex

07/07/2020, 1:29 AM
I see, is there any way to still show any information regarding mapped tasks (eg. maybe I could map over a (feed, description), or implement a
__repr__
method ) to show a description? For example, one of the feeds below took a bit longer than the others but it is a bit difficult to identify which one it is
I guess based on the examples, I think
map
would be ideal for applying a task to primitive items like a list of numbers or filename. In my case, being able to get information on feeds at a glance would be very helpful. Would you be able to give me any guidance on how I could achieve something like that with minimal code changes? For reference, I have around 20 feeds
n

nicholas

07/07/2020, 2:06 AM
You're correct @alex - we have an open issue that I think gets at what you're describing here, I'd encourage you to add your use case to the issue, it helps a lot as we start to design and scope the new feature 🙂 You can find the issue here: https://github.com/PrefectHQ/prefect/issues/2100
👍 1
p

Pedro Machado

07/07/2020, 4:17 AM
I have a similar use case and added a comment on the issue above. Is there a workaround in the meantime?