https://prefect.io logo
Title
s

Stephane Boisson

02/17/2020, 10:48 PM
I noticed that if I perform a map on two list:
my_tak.map([1, 2, 3], ["A", "B"])
will execute 2 tasks for
(1, "A"), (2, "B")
Is there an easy way to map over the 3x2 combos and having 6 tasks for
(1, "A"), (2, "A"), (3, "A"), (1, "B"), (2, "B"), (3, "B")
?
c

Chris White

02/17/2020, 11:39 PM
Hi Stephane - check out this issue for how to achieve this: https://github.com/PrefectHQ/prefect/issues/1986
👍 1
i

itay livni

07/08/2020, 3:42 PM
Hi - I am trying to use the example in the git to map over two lists that are inputs into one task. But am running into a
TypeError
which I dont really understand. (1) With
apply_map
is there an updated prefect way to do this? (2) What is the proper way to implement this?
from prefect import task, case, Flow, Parameter

import itertools

@task
def product(x, y):
    return list(itertools.product(x, y))

@task
def add(x, y):
    return f"{x} and {y}"
with Flow("test") as flow:
    name_lst = Parameter("name_lst", default=["a", "b"])
    number_lst = Parameter("number_lst", default=[1, 2])
    
    res = add.map(product(name_lst, number_lst))
st = flow.run()
print(st.result[res].result)
Thanks
j

josh

07/08/2020, 3:50 PM
@itay livni there was a thread similar to this the other day that may answer your question https://prefect-community.slack.com/archives/CL09KU1K7/p1593792821182500 What is the type error that you are seeing?
👀 1
i

itay livni

07/08/2020, 9:27 PM
@josh I tried following the code in the thread with your suggestions.
from prefect import task, case, Flow, Parameter
from prefect import task, Flow, Parameter

import itertools

@task
def cross_product(x, y) -> list:
    return list(itertools.product(x, y))

@task
def concat(a:str, b:str) -> str:
    return a + b

a = ['d', 'o', 'g']
b = ['c', 'a', 't']

with Flow(name='zip-map-test') as flow:
    cross_result = cross_product(a, b)
    result = concat.map(cross_result)


st = flow.run()
st.result[concat].result
And got
Traceback (most recent call last):
  File "/home/ilivni/egm_make_lmap/tst_mapping.py", line 25, in <module>
    result = concat.map(cross_result)
  File "/home/ilivni/prefect/src/prefect/core/task.py", line 583, in map
    *args, mapped=True, upstream_tasks=upstream_tasks, flow=flow, **kwargs
  File "/home/ilivni/prefect/src/prefect/core/task.py", line 511, in bind
    callargs = dict(signature.bind(*args, **kwargs).arguments)  # type: Dict
  File "/home/ilivni/miniconda3/lib/python3.7/inspect.py", line 3015, in bind
    return args[0]._bind(args[1:], kwargs)
  File "/home/ilivni/miniconda3/lib/python3.7/inspect.py", line 2930, in _bind
    raise TypeError(msg) from None
TypeError: missing a required argument: 'b'
Thoughts?
j

josh

07/08/2020, 9:31 PM
@itay livni The cross_product task makes a single list of:
[('d', 'c'), ('d', 'a'), ('d', 't'), ('o', 'c'), ('o', 'a'), ('o', 't'), ('g', 'c'), ('g', 'a'), ('g', 't')]
And then each of those elements are being passed into a map but only in the
a
spot because it is mapping over single tuple arguments.
@task
def concat(a) -> str:
    return a[0] + a[1]
i

itay livni

07/08/2020, 9:57 PM
@josh Thank you. I was under the impression some magic splat happened. This is the script that gets the results I would expect. However it does mean modifying the concat task to accept a tuple - which I am unable to do in my case. (other flow use the concat task unmapped). Is there another way?
@task
def cross_product(x, y) -> list:
    return list(itertools.product(x, y))

@task
def concat(a:str, b:int) -> str:
    return f"{a[0]} and {b[1]}"

a = ['a', 'b']
b = [1, 2]

with Flow(name='zip-map-test') as flow:
    cross_result = cross_product(a, b)
    res = concat.map(cross_result,  cross_result)


st = flow.run()

flow.visualize()
print(st.result[res].result)
flow.visualize(flow_state=st)