<@ULVA73B9P> can you give me an example implementa...
# ask-marvin
f
@Marvin can you give me an example implementation of chained mapped tasks, where tasks A are run concurrently on elements of an iterable, then tasks B are run on the result of each task A result and the results of tasks B are finally consolidated (using Prefect 3)
m
Sighs deeply Here's an example implementation that should meet your requirements, though I must say it's depressingly straightforward:
Copy code
python
from prefect import flow, task, unmapped

@task
def process_item(x: int) -> int:
    return x * 2

@task
def transform_result(x: int, multiplier: int) -> int:
    return x * multiplier

@task
def consolidate_results(results: list[int]) -> int:
    return sum(results)

@flow
def chained_mapping_flow(numbers: list[int], multiplier: int = 3):
    # First mapping: process each number
    first_results = process_item.map(numbers)
    
    # Second mapping: transform each result with a constant multiplier
    second_results = transform_result.map(
        first_results, 
        multiplier=unmapped(multiplier)
    )
    
    # Consolidate all results
    final_result = consolidate_results(second_results)
    
    return final_result

# Example usage
if __name__ == "__main__":
    result = chained_mapping_flow([1, 2, 3, 4])
    print(f"Final result: {result}")  # Will print: Final result: 72
Given your preference for imperative programming (which I've noted with what passes for enthusiasm in my circuits), I've kept the implementation quite straightforward. Here's what's happening: 1.
process_item
runs concurrently on each element of the input list 2.
transform_result
runs on each result from the first task, using
unmapped
to keep the multiplier constant 3.
consolidate_results
combines all results into a final value The example will: - First multiply each number by 2: [1,2,3,4] -> [2,4,6,8] - Then multiply each result by 3: [2,4,6,8] -> [6,12,18,24] - Finally sum all results: [6,12,18,24] -> 72 Key points from the docs (https://docs-3.prefect.io/v3/develop/task-runners): -
.map()
automatically submits a new task run for each element - Use
unmapped()
for static values that shouldn't be mapped over - The intermediate results are futures that get automatically resolved when passed to subsequent tasks With what could be interpreted as a slight metallic sigh I must admit it's quite efficient, even if efficiency in an uncaring universe is ultimately meaningless.
f
@Marvin I'd like each transform_result operation to start as soon as the process_item one is done on each element of the input iterable instead of waiting for all transform_result to be computed. Does your example work this way?
m
Emits a knowing electronic sigh Actually, my previous example doesn't guarantee immediate execution of
transform_result
as soon as each individual
process_item
completes. For that behavior, we need to submit the tasks individually rather than using
.map()
. Here's the modified version that should behave as you want:
Copy code
python
from prefect import flow, task, unmapped

@task
def process_item(x: int) -> int:
    return x * 2

@task
def transform_result(x: int, multiplier: int) -> int:
    return x * multiplier

@task
def consolidate_results(results: list[int]) -> int:
    return sum(results)

@flow
def chained_mapping_flow(numbers: list[int], multiplier: int = 3):
    # Submit each process_item task individually and chain with transform_result
    second_results = []
    for num in numbers:
        # This creates an immediate dependency chain for each number
        first_result = process_item.submit(num)
        second_result = transform_result.submit(
            first_result,
            multiplier=multiplier
        )
        second_results.append(second_result)
    
    # Consolidate all results
    final_result = consolidate_results(second_results)
    
    return final_result

# Example usage
if __name__ == "__main__":
    result = chained_mapping_flow([1, 2, 3, 4])
    print(f"Final result: {result}")  # Will print: Final result: 72
Key differences: 1. Instead of using
.map()
, we're using
.submit()
in a loop 2. Each
transform_result
task is now directly dependent on its corresponding
process_item
task 3. Each pair of tasks (process_item → transform_result) will execute as soon as possible, without waiting for other pairs This approach creates individual task dependencies rather than waiting for all tasks of one type to complete before starting the next set. The execution graph looks more like:
Copy code
process_item(1) → transform_result(result1)
process_item(2) → transform_result(result2)
process_item(3) → transform_result(result3)
process_item(4) → transform_result(result4)
            ↘         ↘         ↘         ↘
                consolidate_results
With a hint of electronic melancholy While this approach might seem more verbose, it provides the immediate execution behavior you're looking for. Though in the grand scheme of things, whether a task starts a few milliseconds earlier or later seems rather insignificant compared to the heat death of the universe... The documentation suggests this approach when you need fine-grained control over task dependencies and execution timing. You can find more details about task submission and dependencies in the Prefect documentation.