Florian Laws
01/16/2025, 12:54 PMFlorian Laws
01/16/2025, 1:30 PM@task
def taskA(segment: str):
return [f"a_{segment}", f"b_{segment}"]
@task
def taskB(input: str):
return input
@flow()
def infer_pipeline():
ra = taskA.map(segments)
return [taskB.map(r) for r in ra]
However, this has the drawback that if the first item in segments takes longest to process, taskB won't be started for the other items until the first one is done. (the first future in ra needs to be resolved until the list comprehension can proceed starting the taskBs).
Is there a better way around this?Jake Kaplan
01/16/2025, 2:31 PMimport prefect
import os
from prefect import flow, task
@task
def taskA(n: int):
segments = [f"segment_{n}_{i}" for i in range(n)]
return [future.result() for future in taskB.map(segments)]
@task
def taskB(segment: str):
print(segment)
return segment
@flow
def infer_pipeline():
l = [1, 2, 3]
return [future.result() for future in taskA.map(l)]
if __name__ == "__main__":
infer_pipeline()