<@ULVA73B9P> I have a mapped task that returns lis...
# ask-marvin
a
@Marvin I have a mapped task that returns lists. I want to take each list that's returned and map the items within to another task. How can this be accomplished?
m
To accomplish the task of mapping over the results of a mapped task in Prefect, you need to do the following steps: 1. Define the task that will be mapped (Task A) and make sure it returns a list. 2. Define the second task that will operate on the individual items of the lists (Task B). 3. After Task A completes, use a new mapping over Task B that takes the results (lists) of Task A. Here's a code example using Prefect:
Copy code
python
from prefect import flow, task

@task
def task_a(arg):
    # This task returns a list
    return [1, 2, 3, arg]

@task
def task_b(item):
    # This task operates on individual items
    return item ** 2

@flow
def my_mapped_flow():
    # Execute task_a with mapping and collect all lists
    results_lists = task_a.map([10, 20, 30])

    # Map task_b over each item in the result lists from task_a
    final_results = []
    for result_list in results_lists:
        final_results.extend(task_b.map(result_list))

    return final_results

# Start the flow
my_mapped_flow()
When using
.map()
on a task, a new task run is created for each element of its input. It's important to note that using
extend
here is a placeholder for the actual mapping functionality, as Prefect's mapping creates a dynamic workflow which manages the iteration internally. You wouldn't actually use
extend
in your real flow; instead, you'd allow Prefect to manage the iteration over the mapped results. If working with complex mapping scenarios, make sure you have a clear understanding of how Prefect handles inputs and unmapped arguments, as you might need to use the
unmapped
function to pass data that should not be mapped along with the other inputs.
a
@Marvin in this scenario, wouldn't we wait to execute any instances of task_b until all instances of task_a return? I'm looking to avoid creating a bottleneck
m
Apologies for any confusion earlier. Indeed, to prevent creating a bottleneck where you wait for all instances of
task_a
to complete before starting
task_b
, the solution would be to directly map over the results of
task_a
as they become available. This can be accomplished by flattening the lists from
task_a
as they return and then mapping
task_b
over these flattened results. Prefect's execution model will automatically handle iteration over the lists for you, and the individual items from the lists produced by
task_a
will be processed by
task_b
in a non-blocking fashion. Here's a modified example that aligns with your requirement:
Copy code
python
from prefect import flow, task
from prefect.tasks import flatten

@task
def task_a(arg):
    # This task returns a list
    return [1, 2, 3, arg]

@task
def task_b(item):
    # This task operates on individual items
    return item ** 2

@flow
def my_mapped_flow():
    # Execute task_a with mapping and collect all lists
    results_lists = task_a.map([10, 20, 30])

    # Flatten the lists as they are being returned
    flattened_results = flatten(results_lists)

    # Now map task_b over each item in the flattened result directly
    final_results = task_b.map(flattened_results)

    return final_results

# Start the flow
my_mapped_flow()
In this code,
flatten()
is a conceptual representation of how you might transform your mapping setup. Any lists returned by instances of
task_a
will be flattened, and then mapping of
task_b
will be performed on each of the individual elements from the flattened list, thereby avoiding the bottleneck. The
flatten
task in Prefect ensures that the mapped outputs can be used as inputs to the next task without waiting for the entire set of mapped tasks to complete. Please note that this exact
flatten
function may not exist in Prefect. However, the example demonstrates the desired behavior. You would use a combination of Prefect features to achieve the equivalent results. Prefect flows are designed to be inherently asynchronous and concurrent, allowing for this type of parallel execution out-of-the-box.
n
is this what you mean?
a
I'm not sure that it is. I believe Marvin's 2nd answer gets at what I'm trying to do. problem is, I can't see how to flatten in 2.0. Is it possible?
n
hm okay personally for flattening id just do some double list comp like this
🙌 1