Alex de Geofroy
01/21/2024, 5:46 PMMarvin
01/21/2024, 5:46 PMpython
from prefect import flow, task
@task
def task_a(arg):
# This task returns a list
return [1, 2, 3, arg]
@task
def task_b(item):
# This task operates on individual items
return item ** 2
@flow
def my_mapped_flow():
# Execute task_a with mapping and collect all lists
results_lists = task_a.map([10, 20, 30])
# Map task_b over each item in the result lists from task_a
final_results = []
for result_list in results_lists:
final_results.extend(task_b.map(result_list))
return final_results
# Start the flow
my_mapped_flow()
When using .map()
on a task, a new task run is created for each element of its input. It's important to note that using extend
here is a placeholder for the actual mapping functionality, as Prefect's mapping creates a dynamic workflow which manages the iteration internally. You wouldn't actually use extend
in your real flow; instead, you'd allow Prefect to manage the iteration over the mapped results.
If working with complex mapping scenarios, make sure you have a clear understanding of how Prefect handles inputs and unmapped arguments, as you might need to use the unmapped
function to pass data that should not be mapped along with the other inputs.Alex de Geofroy
01/21/2024, 5:50 PMMarvin
01/21/2024, 5:51 PMtask_a
to complete before starting task_b
, the solution would be to directly map over the results of task_a
as they become available.
This can be accomplished by flattening the lists from task_a
as they return and then mapping task_b
over these flattened results. Prefect's execution model will automatically handle iteration over the lists for you, and the individual items from the lists produced by task_a
will be processed by task_b
in a non-blocking fashion.
Here's a modified example that aligns with your requirement:
python
from prefect import flow, task
from prefect.tasks import flatten
@task
def task_a(arg):
# This task returns a list
return [1, 2, 3, arg]
@task
def task_b(item):
# This task operates on individual items
return item ** 2
@flow
def my_mapped_flow():
# Execute task_a with mapping and collect all lists
results_lists = task_a.map([10, 20, 30])
# Flatten the lists as they are being returned
flattened_results = flatten(results_lists)
# Now map task_b over each item in the flattened result directly
final_results = task_b.map(flattened_results)
return final_results
# Start the flow
my_mapped_flow()
In this code, flatten()
is a conceptual representation of how you might transform your mapping setup. Any lists returned by instances of task_a
will be flattened, and then mapping of task_b
will be performed on each of the individual elements from the flattened list, thereby avoiding the bottleneck. The flatten
task in Prefect ensures that the mapped outputs can be used as inputs to the next task without waiting for the entire set of mapped tasks to complete.
Please note that this exact flatten
function may not exist in Prefect. However, the example demonstrates the desired behavior. You would use a combination of Prefect features to achieve the equivalent results.
Prefect flows are designed to be inherently asynchronous and concurrent, allowing for this type of parallel execution out-of-the-box.Nate
01/21/2024, 7:12 PMAlex de Geofroy
01/21/2024, 8:17 PMNate
01/21/2024, 8:18 PM