Tomer Cagan
03/23/2022, 7:31 AM@task(nout=2)
def load_stuff() -> Tuple[List[int], List[int]]:
return [1,2,3], [4,5,6]
@task(nout=2)
def process_stuff(low: int, high: int) -> Tuple[int, int]:
return low*low, high*high
@task
def post_process(lows: int, highs: int):
print(lows)
print(highs)
with Flow("map-multi") as flow2:
v_l, v_h = load_stuff()
p_l, p_h = process_stuff.map(v_l, v_h)
post_process(p_l, p_h)
if __name__ == "__main__":
flow2.run()
I am getting an error on `p_l, p_h = process_stuff.map(v_l, v_h)`:
Traceback (most recent call last):
File "/Users/tomercagan/dev/nextresearch/bin/run_projection.py", line 98, in <module>
p_l, p_h = process_stuff.map(v_l, v_h)
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/prefect/core/task.py", line 998, in __iter__
raise TypeError(
TypeError: Task is not iterable. If your task returns multiple results, pass `nout` to the task decorator/constructor, or provide a `Tuple` return-type annotation to your task.
I have done both type-hinting and nout but this fails.# load_stuff and process_stuff as above and
@task
def post_process(results: List[Tuple[int, int]]):
lows, highs = list(zip(*results))
print(lows)
print(highs)
with Flow("map-multi") as flow2:
v_l, v_h = load_stuff()
results = process_stuff.map(v_l, v_h)
post_process(results)
And it will work, but I am still wondering about the first implementation (which I think is more natural)Pim Claessens
03/23/2022, 9:24 AM@task(nout=2)
def post_process(results: List[Tuple[int, int]]):
lows, highs = list(zip(*results))
return lows, highs
Tomer Cagan
03/23/2022, 11:40 AMnout=2
it does not work.
Note that it works for the first tasks (load_stuff) but not for the second task which is mapped over the output of the first.
Also note that if I change how I "capture" the output of the mapped task as one return value it "properly" give me a list of tuples.
Maybe mapped task cannot properly separate / divide the return values of each call?Anna Geller
process_stuff
returns a Tuple. Returning tuples doesn't work with mapped tasks - you can read more about that in this issue.
Here is one way how you could handle that:
from prefect import task, Flow
from typing import List, Tuple
@task(nout=2)
def load_stuff() -> Tuple[List[int], List[int]]:
return [1, 2, 3], [4, 5, 6]
@task(nout=2)
def process_stuff(low: int, high: int) -> List[int]:
return [low * low, high * high]
@task(log_stdout=True)
def post_process(nr: int) -> None:
print(nr)
with Flow("map-multi") as flow:
v_l, v_h = load_stuff()
results = process_stuff.map(v_l, v_h)
post_process.map(results)
if __name__ == "__main__":
flow.run()
Tomer Cagan
03/23/2022, 12:46 PM