https://prefect.io logo
Title
j

Jonah Benton

08/23/2019, 12:54 AM
Hi folks, same context as my question above. Am realizing I am misunderstanding how task mapping/iterating currently works. On first quick read of the Prefect docs I assumed that given a fan out structure: results1 = task1.map(input_list) results2 = task2.map(results1) results3 = task3.map(results2) where task1 and task2 return lists: I assumed a copy of task2 gets launched for every item in every list produced by task1; and similarly that a copy of task3 gets launched for every item in every list produced by every instance of task2. This doesn't seem to be the case; instead, a copy of task2 is launched for every instance of task1, and a copy of task3 is launched for every instance of task2. This means that task1 gets to process individual items, but tasks2 and 3 have to process lists of items. Is there some simple magic, e.g. by making a custom Task class, where I can achieve the behavior I'm looking for, where individual tasks process individual items, and lists returned from tasks are automatically "flattened" by the Prefect machinery?
j

Jeremiah

08/23/2019, 12:57 AM
Hi @Jonah Benton — your first impression is correct, by mapping over mapped tasks, you should only be processing individual items all the way through the three tasks in the pipeline. For example, the second item of
input_list
gets passed to the second copy of
task1
, then reduced into
results1
(a list of all results).
task2
then maps over that list, such that the second copy of
task2
gets only the result produced by the second copy of
task1
.
We usually illustrate this by mapping a function that operates on a scalar over a list:
from prefect import task, Flow

@task
def plus_1(x):
    print('received {}'.format(x))
    print('returning {}'.format(x + 1))
    return x + 1

with Flow('iterated mapping') as flow:
    input = [1, 2, 3]
    result1 = plus_1.map(input)
    result2 = plus_1.map(result1)
    result3 = plus_1.map(result2)

flow.run()
j

Jonah Benton

08/23/2019, 1:01 AM
Yes, I see that; however, in that case, the task plus_1 returns a scalar. I have tasks that have to return lists.
For instance, a task takes a twitter screen name and returns the list of users that name follows.
Then, a task that takes a followed-user-name, and returns a list of recent tweets.
j

Jeremiah

08/23/2019, 1:02 AM
I see — mapping always iterates over the “first” level of the mapped object. In your case, you may want to introduce a flatten task that takes the nested list-of-lists and turns it into a single list, which can then be mapped over
j

Jonah Benton

08/23/2019, 1:03 AM
Oh, ok, sure, a flatten can be done in a reduce step. Will do that.
j

Jeremiah

08/23/2019, 1:04 AM
:yes: sorry for misunderstanding at first!
We will add this to a list of “advanced mapping” use cases — others include things like batched mapping (process N items at a time)
Mapping has been a super useful feature, so we’re working out what mapping 2.0 could look like, so this is very helpful
j

Jonah Benton

08/23/2019, 1:07 AM
Cool, yeah, agree, it is extremely valuable; being able to handle fan-out transparently is kind of a holy grail. Thanks very much again also for the responsiveness!
j

Jeremiah

08/23/2019, 1:08 AM
I hear you — if we can work out an API to autodetect the fan out, anything’s possible. And of course! Hit us up with any other questions, we want to make sure this is as easy as possible!