Another issue I came across is when you combine nout > 1 and .map:
The multiple return values from the nout>1 task are not iterated correctly when passed as args to the next task
Copy code
from typing import Tuple
from prefect import Flow, Task, task
@task(nout=2)
def step1(num) -> Tuple[int, str]:
chars = ['a','b','c']
return num, chars[num - 1]
@task
def step2(arg1, arg2):
print(f"arg1={arg1} arg2={arg2}")
def unzip(nout: int) -> Task:
@task(nout=nout)
def _unzip(res) -> tuple:
return tuple(zip(*res))
return _unzip
with Flow("nout test") as flow:
numbers = [1, 2, 3]
# bad
a, b = step1.map(num=numbers)
step2.map(arg1=a, arg2=b)
# prints:
# arg1=1 arg2=2
# arg1=a arg2=b
# good - with unzip workaround
c, d = unzip(2)(step1.map(num=numbers))
step2.map(arg1=c, arg2=d)
# prints:
# arg1=1 arg2=a
# arg1=2 arg2=b
# arg1=3 arg2=c
if __name__ == "__main__":
flow.run()
c
Chris White
03/04/2021, 4:41 AM
This is expected; the type signature for a mapped task is essentially
Iterable -> Iterable
and you’re looking for a reshaping of that to something like
Iterable[Tuple] -> Tuple[Itereable]
, hence the need for your
unzip
intermediate task
Chris White
03/04/2021, 4:41 AM
Also, side note — would you mind posting large code blocks like this into the thread? It helps keep the main channel more organized and concise
d
Daniel Caldeweyher
03/04/2021, 5:41 AM
Thank you. I will post in snippet or thread next time.
I guess I was surprised by the behaviour as it is not immediately obvious and not mentioned in the documentation
j
Jim Crist-Harif
03/04/2021, 2:50 PM
No, I do think this is a bug. The
nout
parameter shouldn't take affect for a mapped output (you should have gotten an error in the first one). But Chris's overall answer is correct, the output of
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.