Hi! I want to do some data processing with tree-sh...
# ask-community
f
Hi! I want to do some data processing with tree-shaped fanout, i.e. I have a list, want to process each item of that list in parallel, that results in multiple work items for the next task. etc. a bit like https://medium.com/the-prefect-blog/map-faster-mapping-improvements-in-prefect-0-12-0-7cacc3f14e16 but with additional branching before "even more" etc. How do I do that/
The best I could come up with is something like:
Copy code
@task
def taskA(segment: str):
   return [f"a_{segment}", f"b_{segment}"]

@task
def taskB(input: str):
   return input

@flow()
def infer_pipeline():
    ra = taskA.map(segments)
    return [taskB.map(r) for r in ra]
However, this has the drawback that if the first item in segments takes longest to process, taskB won't be started for the other items until the first one is done. (the first future in ra needs to be resolved until the list comprehension can proceed starting the taskBs). Is there a better way around this?
j
Would nesting the tasks work for your use case?
Copy code
import prefect
import os
from prefect import flow, task

@task
def taskA(n: int):
    segments = [f"segment_{n}_{i}" for i in range(n)]
    return [future.result() for future in taskB.map(segments)]

@task
def taskB(segment: str):
   print(segment)
   return segment

@flow
def infer_pipeline():
    l = [1, 2, 3]
    return [future.result() for future in taskA.map(l)]

if __name__ == "__main__":
    infer_pipeline()