https://prefect.io logo
Title
c

CM

06/21/2021, 3:57 PM
Hi everyone, I am trying to figure out how to work with a mapped flow with some of the tasks having multiple return values and running into some issues (details in the thread).
I have two tasks defined as follows: @task(nout=3) def taskA(v=None): return 'A', 'a', 'Aa' @task def taskB(v1, v2, v3): return ''.join([v1, v2, v3])
When I run the following flow, the results are as expected: with Flow('testA') as flow: v1, v2, v3 = taskA() x = taskB(v1, v2, v3) result of taskB is 'AaAa'
However, when I map over the tasks as follows: with Flow('testA') as flow: v1, v2, v3 = taskA.map(range(5), task_args={'nout': 3}) x = taskB.map(v1, v2, v3) I get the result of taskB as ['AAA', 'aaa', 'AaAaAa']
I was expecting to see ['AaAa', 'AaAa', 'AaAa', 'AaAa', 'AaAa']
also note that I had to add task_args={'nout': 3} to the map call as it appears the map call drops the original task attribute specified in the decorator
k

Kevin Kho

06/21/2021, 4:09 PM
Hey @CM, when you map taskA, you get a
List[Tuple]
back like [(‘A’, ‘a’, ‘Aa’), (‘A’, ‘a’, ‘Aa’), (‘A’, ‘a’, ‘Aa’), …] .
v1, v2, v3 = taskA()
assumes a
Tuple[List]
type structure where you’re expecting v1, v2, and v3 to be lists. You need an intermediate task here to reshape the
List[Tuple]
to
Tuple[List]
type of structure
Something like this will reshape it
@task(nout=3)
def reshape(x: List[Tuple]) -> Tuple[List]:
    out1 = [_[0] for _ in x]
    out2 = [_[1] for _ in x]
    out3 = [_[2] for _ in x]
    return out1, out2, out3
c

CM

06/21/2021, 4:49 PM
Thanks @Kevin Kho -- that did the trick. One follow-up question to help me better understand the internals: would the reshape function load up all of the tuples from storage into memory in order to transform the data? In my use case, the tuple output of each taskA run is small but it maps over an iterable with many elements, so the entire structure would be too big to fit into memory. My original thinking was taskA would map and generate the results of each run and then taskB would pick up each tuple at a time and generate its result. Not sure how reshape would play into this.
by the way, this is the modified flow: with Flow('testA') as flow: xx = taskA.map(range(5)) v1, v2, v3 = reshape(xx) x = taskB.map(v1, v2, v3)
(reshape call is not mapped -- I assume that's what you meant)
k

Kevin Kho

06/21/2021, 4:52 PM
I think you’re right that this will have to load everything into memory for this operation. Yes that’s right reshape is not mapped. If this memory usage is too expensive, I think you either have to give up map and offshore the operation to Dask instead.
c

CM

06/21/2021, 4:54 PM
Thanks
k

Kevin Kho

06/21/2021, 4:55 PM
Maybe you can also somehow take advantage of
flatten
or
unmapped
to lessen the memory footprint, but I wouldn’t be 100% sure this is possible from this simple example. Looking at this simple example though, if
'A', 'a', 'Aa'
are constant throughout the next mapping operation, you can pass the list as
unmapped
instead so you don’t have to perform the first map