Florian Laws
01/16/2025, 12:54 PMFlorian Laws
01/16/2025, 1:30 PM@task
def taskA(segment: str):
return [f"a_{segment}", f"b_{segment}"]
@task
def taskB(input: str):
return input
@flow()
def infer_pipeline():
ra = taskA.map(segments)
return [taskB.map(r) for r in ra]
However, this has the drawback that if the first item in segments takes longest to process, taskB won't be started for the other items until the first one is done. (the first future in ra needs to be resolved until the list comprehension can proceed starting the taskBs).
Is there a better way around this?Jake Kaplan
01/16/2025, 2:31 PMimport prefect
import os
from prefect import flow, task
@task
def taskA(n: int):
segments = [f"segment_{n}_{i}" for i in range(n)]
return [future.result() for future in taskB.map(segments)]
@task
def taskB(segment: str):
print(segment)
return segment
@flow
def infer_pipeline():
l = [1, 2, 3]
return [future.result() for future in taskA.map(l)]
if __name__ == "__main__":
infer_pipeline()
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by