Daniel Caldeweyher
03/03/2021, 11:26 PMfrom typing import Tuple
from prefect import Flow, Task, task
@task(nout=2)
def step1(num) -> Tuple[int, str]:
chars = ['a','b','c']
return num, chars[num - 1]
@task
def step2(arg1, arg2):
print(f"arg1={arg1} arg2={arg2}")
def unzip(nout: int) -> Task:
@task(nout=nout)
def _unzip(res) -> tuple:
return tuple(zip(*res))
return _unzip
with Flow("nout test") as flow:
numbers = [1, 2, 3]
# bad
a, b = step1.map(num=numbers)
step2.map(arg1=a, arg2=b)
# prints:
# arg1=1 arg2=2
# arg1=a arg2=b
# good - with unzip workaround
c, d = unzip(2)(step1.map(num=numbers))
step2.map(arg1=c, arg2=d)
# prints:
# arg1=1 arg2=a
# arg1=2 arg2=b
# arg1=3 arg2=c
if __name__ == "__main__":
flow.run()
Chris White
Iterable -> Iterable
and you’re looking for a reshaping of that to something like Iterable[Tuple] -> Tuple[Itereable]
, hence the need for your unzip
intermediate taskChris White
Daniel Caldeweyher
03/04/2021, 5:41 AMJim Crist-Harif
03/04/2021, 2:50 PMnout
parameter shouldn't take affect for a mapped output (you should have gotten an error in the first one). But Chris's overall answer is correct, the output of step1.map
is a list of tuples, not a tuple of lists.Jim Crist-Harif
03/04/2021, 3:23 PM