alex
07/06/2020, 11:24 PMrun
method, calling run on a parent source calls the run
method for each of the children. Currently, all the top-level sources are called sequentially and then finally aggregated, does anyone have any ideas on the most "prefectic" way to structure this to maximize parallelization and conciseness?
One way is to define a task for each highlevel source, but this seems to be a bit tedious. We also won't be able to parallelize the children. Another approach I tried is something like this.
@task
def run_for_child(child):
child.run(max_items=50)
return child.collection_name
@task
def run_source(source):
cond = source.is_parent
with case(cond, True):
res1 = run_for_child.map(source.children)
with case(cond, False):
source.run(max_items=50)
res2 = source.collection_name
return merge(res1, res2)
@task
def get_highlevel_sources(config):
# return list[Source] based off config
# In the flow, get the sources and do runsource.map. At the end, aggregate all collections
This gave me a could not infer active flow context
and I'm not sure if this is the best way to structure this anywaysKyle Moon-Wright
07/06/2020, 11:49 PMfrom prefect import Flow, task
@task
def run_parent_feeds(parent):
return parent.run()
# do other stuff
@task
def run_child_feeds(child):
return child.run()
# do other stuff
@task
def merge_feeds(feed1, feed2):
return feed1 + feed2
@task
def get_parents(feed):
return feed.is_parent
@task
def get_children(feed):
return not feed.is_parent
@task
def get_sources(config):
# return some mix of parents/children sources
with Flow("feed flow") as flow:
config = {"some_config": "some_value"}
sources = get_sources(config)
parents = get_parents.map(sources)
children = get_children.map(sources)
parent_feeds = run_parent_feeds.map(parents)
child_feeds = run_child_feeds.map(children)
merged_feeds = merge_feeds(parent_feeds, child_feeds)
alex
07/07/2020, 12:45 AMnicholas
alex
07/07/2020, 12:51 AM@task
def my_task():
feed = Feed()
feed.run(max_items=50)
return feed.name
I was able to see my_task
-> etc. Ideally for the shared example, being able to visualize all the feeds (so I can look at their status as they might run for different times/be stuck etc.) instead of just seeing a Constant[list] would be great. 😬nicholas
alex
07/07/2020, 1:29 AM__repr__
method ) to show a description? For example, one of the feeds below took a bit longer than the others but it is a bit difficult to identify which one it ismap
would be ideal for applying a task to primitive items like a list of numbers or filename. In my case, being able to get information on feeds at a glance would be very helpful. Would you be able to give me any guidance on how I could achieve something like that with minimal code changes? For reference, I have around 20 feedsnicholas
Pedro Machado
07/07/2020, 4:17 AM