Trevor Kramer
05/01/2021, 6:47 PMfrom prefect import Flow, task
@task
def add_ten(x, y):
return x + y
@task()
def log_result(x):
print(x)
with Flow('simple map') as flow:
mapped_result = add_ten.map([1, 2, 3], [10, 11, 12])
log_result(mapped_result)
if __name__ == '__main__':
from prefect.run_configs import LocalRun
flow.run_config = LocalRun()
flow.run()
I was expecting this code to return 9 results instead of the 3 actually returned. Is there a way to have map do the pairwise enumeration? I was assuming because neither argument was marked as unmapped then they would both be looped over.Chris White
zip
- there are a few github issues on this pattern that you should check out that also include how you can achieve this:
• https://github.com/PrefectHQ/prefect/issues/1986#issuecomment-583188939
• https://github.com/PrefectHQ/prefect/issues/3416Trevor Kramer
05/01/2021, 7:12 PMimport itertools
from typing import Any
from prefect import Flow, task
@task()
def product(x, y) -> list[tuple[Any, ...]]:
return list(itertools.product(x, y))
@task()
def add_ten(x, y):
return x + y
@task()
def log_result(x):
print(x)
with Flow('simple map') as flow:
mapped_result = add_ten.map(*product([1, 2, 3], [10, 11, 12]))
log_result(mapped_result)
if __name__ == '__main__':
from prefect.run_configs import LocalRun
flow.run_config = LocalRun()
flow.run()
or do you have to pass the tuple into the mapped task?Chris White
product
task (also note the nout=2
in the task constructor):
@task(nout=2)
def product(x, y) -> Tuple[List, List]:
products = list(itertools.product(x, y))
first_arg = [a for (a, b) in products]
second_arg = [b for (a, b) in products]
return first_arg, second_arg
...
mapped_result = add_ten.map(product[0], product[1])
Trevor Kramer
05/01/2021, 10:43 PM@task(nout=2)
def product(x: List, y: List) -> Tuple[List, List]:
return tuple(zip(*itertools.product(x, y)))