Bob Colner
07/06/2020, 6:40 PMitertools.product
function to map over multiple tasks.
However, I can't get this to work in a simple example:
import itertools
from prefect import Flow, task
from prefect.engine.executors import LocalDaskExecutor
@task(checkpoint=False)
def cross_product(x, y) -> list:
return list(itertools.product(x, y))
@task(checkpoint=False)
def concat(a:str, b:str) -> str:
return a + b
a = ['d', 'o', 'g']
b = ['c', 'a', 't']
with Flow(name='zip-map-test') as flow:
result = concat.map(cross_product(a, b))
executor = LocalDaskExecutor(scheduler='threads')
flow_state = flow.run(executor)
TypeError: missing a required argument: 'b'
Kyle Moon-Wright
07/06/2020, 6:48 PMconcat
function you're mapping is looking for two arguments - whereas you only have cross_product(a, b)
as the first arg.Bob Colner
07/06/2020, 6:50 PM[('d', 'c'),
('d', 'a'),
('d', 't'),
('o', 'c'),
('o', 'a'),
('o', 't'),
('g', 'c'),
('g', 'a'),
('g', 't')]
so for each task there are 2 strings available -in normal python I would use the 'splat' * to unpack the tupleDylan
with Flow(name='zip-map-test') as flow:
cross_result = cross_product(a,b)
result = concat.map(cross_result)
and move the unpacking of the tuple into the concat
taskBob Colner
07/06/2020, 7:18 PM