https://prefect.io logo
t

Trevor Kramer

05/01/2021, 6:47 PM
Copy code
from prefect import Flow, task

@task
def add_ten(x, y):
    return x + y

@task()
def log_result(x):
    print(x)


with Flow('simple map') as flow:
    mapped_result = add_ten.map([1, 2, 3], [10, 11, 12])
    log_result(mapped_result)
if __name__ == '__main__':
    from prefect.run_configs import LocalRun
    flow.run_config = LocalRun()
    flow.run()
I was expecting this code to return 9 results instead of the 3 actually returned. Is there a way to have map do the pairwise enumeration? I was assuming because neither argument was marked as unmapped then they would both be looped over.
c

Chris White

05/01/2021, 7:02 PM
Hi Trevor, Prefect mapping behaves analogously to Python’s
zip
- there are a few github issues on this pattern that you should check out that also include how you can achieve this: • https://github.com/PrefectHQ/prefect/issues/1986#issuecomment-583188939https://github.com/PrefectHQ/prefect/issues/3416
t

Trevor Kramer

05/01/2021, 7:12 PM
Thanks. Is there a way to separate the tuple after the product - like this?
Copy code
import itertools
from typing import Any

from prefect import Flow, task


@task()
def product(x, y) -> list[tuple[Any, ...]]:
    return list(itertools.product(x, y))


@task()
def add_ten(x, y):
    return x + y


@task()
def log_result(x):
    print(x)


with Flow('simple map') as flow:
    mapped_result = add_ten.map(*product([1, 2, 3], [10, 11, 12]))
    log_result(mapped_result)
if __name__ == '__main__':
    from prefect.run_configs import LocalRun

    flow.run_config = LocalRun()
    flow.run()
or do you have to pass the tuple into the mapped task?
c

Chris White

05/01/2021, 7:34 PM
I think you’ll need to do some extra manipulation within your
product
task (also note the
nout=2
in the task constructor):
Copy code
@task(nout=2)
def product(x, y) -> Tuple[List, List]:
    products = list(itertools.product(x, y))
    first_arg = [a for (a, b) in products]
    second_arg = [b for (a, b) in products]
    return first_arg, second_arg 

...

    mapped_result = add_ten.map(product[0], product[1])
t

Trevor Kramer

05/01/2021, 10:43 PM
Thanks! This can be simplified a bit
Copy code
@task(nout=2)
def product(x: List, y: List) -> Tuple[List, List]:
    return tuple(zip(*itertools.product(x, y)))
marvin 1