https://prefect.io logo
Title
j

Jake Schmidt

12/18/2019, 4:33 PM
Hi gang, a(nother) question about nested fanouts: I have a
dict
where the values are variable-length lists. I’d like to map a fanout on each key, then a fanout for each element in that key’s value, such that the operation on each value also receives the result of the “outer fanout’s” operation on its key. Would the best approach be to use a nested flow? I saw https://github.com/PrefectHQ/prefect/issues/1311 is similar...
z

Zachary Hughes

12/18/2019, 6:00 PM
Hi @Jake Schmidt, great question! You can definitely run a flow from another flow, though it's not supported in a "first-class" way. Interestingly, we just had a meeting to discuss something similar yesterday. If you're up for it, I'd love to ask a few more questions about your use case: • would you still be trying to run nested flows if nested mapping was available at the task level? • once your operations fan out, do you anticipate them fanning back in?
j

Jake Schmidt

12/18/2019, 6:01 PM
1) probably not, though I can’t say for sure 2) currently, the inner fan out needs to fan-in but not the outer one. Currently I’m solving this by doing multiple parallel
map
tasks, followed by a
zip
task, followed by another map.
To be clear, the functionality could also be like this, rather than a simple nested fanout:
for key, varlenlist in jobs.items():
  key_result = key_task(key)
  item_results = []
    for item in varlenlist:
        a = task_a(item, key_result)
        b = task_b(item, key_result)
        item_results.append(a)
        item_results.append(b)
    result = join_task(item_results)
z

Zachary Hughes

12/18/2019, 6:31 PM
Awesome, that's super useful-- thanks Jake!
j

Jake Schmidt

12/20/2019, 2:43 PM
Do y’all have a timeline for releasing this feature yet?
z

Zachary Hughes

12/20/2019, 3:24 PM
I've actually got a PR in for a lightweight implementation of kicking off a flow run from a task, if that's at all helpful! I don't have an immediate answer re: timelines for a heavier-duty implementation, but if the lightweight version isn't enough I can ask around for you! https://github.com/PrefectHQ/prefect/pull/1871
j

Jake Schmidt

12/20/2019, 3:52 PM
Gotcha, I’ll look into this… is there a way to actually merge a nested subflow (maybe
flow.update(…)
), so that we could at least see it in
flow.visualize
? Right now
visualize
seems to only compute/display the outer flow. Is there an example of the use of
flow.update
?
z

Zachary Hughes

12/20/2019, 7:19 PM
I'm honestly not sure-- let me call in someone who might know better. @Chris White, thoughts?
👀 1
c

Chris White

12/20/2019, 7:38 PM
Hey Jake - since subflows are not (yet) a first-class concept, all Prefect tracks are Tasks (and Prefect has no knowledge of what occurs within your Tasks).
flow.update(subflow)
will take all tasks from the subflow and add them to the parent flow. This oftentimes results in two disjoint DAGs being represented, unless you built both flows using the exact same tasks, e.g.
@task
def common_task():
    ...

@task
def task_one(x):
    ...

@task
def task_two(y):
   ...

with Flow("one") as flow_one:
    a = task_one(common_task)

with Flow("two") as flow_two:
    b = task_two(common_task)

flow_two.update(flow_one)
👍🏼 1