Bob Colner

07/06/2020, 6:40 PM
Hi all, I'm struggling to get the 'zip' mapping pattern to work, I'm probably missing something basic here. I'm tying to map a task with 2 parameters. I'm looking at the reply to this github issue ( where it is suggested to use the
function to map over multiple tasks. However, I can't get this to work in a simple example:
import itertools
from prefect import Flow, task
from prefect.engine.executors import LocalDaskExecutor

def cross_product(x, y) -> list:
    return list(itertools.product(x, y))

def concat(a:str, b:str) -> str:
    return a + b

a = ['d', 'o', 'g']
b = ['c', 'a', 't']

with Flow(name='zip-map-test') as flow:
    result =, b))

executor = LocalDaskExecutor(scheduler='threads')

flow_state =
when I run this flow I get:
TypeError: missing a required argument: 'b'

Kyle Moon-Wright

07/06/2020, 6:48 PM
For that particular error, the
function you're mapping is looking for two arguments - whereas you only have
cross_product(a, b)
as the first arg.

Bob Colner

07/06/2020, 6:50 PM
yeah, my understanding is that the the cross_prod task returns a list of tuples:
[('d', 'c'),
 ('d', 'a'),
 ('d', 't'),
 ('o', 'c'),
 ('o', 'a'),
 ('o', 't'),
 ('g', 'c'),
 ('g', 'a'),
 ('g', 't')]
so for each task there are 2 strings available -in normal python I would use the 'splat' * to unpack the tuple
I guess I assumed the the prefect map know how to do this (the github issue does not use any *splat)
@Dylan do you have any ideas? trying to implement the suggestion you gave my last week. thanks!


07/06/2020, 7:01 PM
@Bob Colner In the example above, you might need to adjust your flow to be:
with Flow(name='zip-map-test') as flow:
    cross_result = cross_product(a,b)
    result =
and move the unpacking of the tuple into the
Prefect defers computation to execution time, as opposed to definition time

Bob Colner

07/06/2020, 7:18 PM
ok, cool. let me try this. FIY -this will be nice to add to the mapping docs
that worked. thanks