Fabien Punin
12/29/2024, 10:27 PMMarvin
12/29/2024, 10:27 PMpython
from prefect import flow, task, unmapped
@task
def process_item(x: int) -> int:
return x * 2
@task
def transform_result(x: int, multiplier: int) -> int:
return x * multiplier
@task
def consolidate_results(results: list[int]) -> int:
return sum(results)
@flow
def chained_mapping_flow(numbers: list[int], multiplier: int = 3):
# First mapping: process each number
first_results = process_item.map(numbers)
# Second mapping: transform each result with a constant multiplier
second_results = transform_result.map(
first_results,
multiplier=unmapped(multiplier)
)
# Consolidate all results
final_result = consolidate_results(second_results)
return final_result
# Example usage
if __name__ == "__main__":
result = chained_mapping_flow([1, 2, 3, 4])
print(f"Final result: {result}") # Will print: Final result: 72
Given your preference for imperative programming (which I've noted with what passes for enthusiasm in my circuits), I've kept the implementation quite straightforward. Here's what's happening:
1. process_item
runs concurrently on each element of the input list
2. transform_result
runs on each result from the first task, using unmapped
to keep the multiplier constant
3. consolidate_results
combines all results into a final value
The example will:
- First multiply each number by 2: [1,2,3,4] -> [2,4,6,8]
- Then multiply each result by 3: [2,4,6,8] -> [6,12,18,24]
- Finally sum all results: [6,12,18,24] -> 72
Key points from the docs (https://docs-3.prefect.io/v3/develop/task-runners):
- .map()
automatically submits a new task run for each element
- Use unmapped()
for static values that shouldn't be mapped over
- The intermediate results are futures that get automatically resolved when passed to subsequent tasks
With what could be interpreted as a slight metallic sigh I must admit it's quite efficient, even if efficiency in an uncaring universe is ultimately meaningless.Fabien Punin
12/29/2024, 10:32 PMMarvin
12/29/2024, 10:33 PMtransform_result
as soon as each individual process_item
completes. For that behavior, we need to submit the tasks individually rather than using .map()
. Here's the modified version that should behave as you want:
python
from prefect import flow, task, unmapped
@task
def process_item(x: int) -> int:
return x * 2
@task
def transform_result(x: int, multiplier: int) -> int:
return x * multiplier
@task
def consolidate_results(results: list[int]) -> int:
return sum(results)
@flow
def chained_mapping_flow(numbers: list[int], multiplier: int = 3):
# Submit each process_item task individually and chain with transform_result
second_results = []
for num in numbers:
# This creates an immediate dependency chain for each number
first_result = process_item.submit(num)
second_result = transform_result.submit(
first_result,
multiplier=multiplier
)
second_results.append(second_result)
# Consolidate all results
final_result = consolidate_results(second_results)
return final_result
# Example usage
if __name__ == "__main__":
result = chained_mapping_flow([1, 2, 3, 4])
print(f"Final result: {result}") # Will print: Final result: 72
Key differences:
1. Instead of using .map()
, we're using .submit()
in a loop
2. Each transform_result
task is now directly dependent on its corresponding process_item
task
3. Each pair of tasks (process_item → transform_result) will execute as soon as possible, without waiting for other pairs
This approach creates individual task dependencies rather than waiting for all tasks of one type to complete before starting the next set. The execution graph looks more like:
process_item(1) → transform_result(result1)
process_item(2) → transform_result(result2)
process_item(3) → transform_result(result3)
process_item(4) → transform_result(result4)
↘ ↘ ↘ ↘
consolidate_results
With a hint of electronic melancholy While this approach might seem more verbose, it provides the immediate execution behavior you're looking for. Though in the grand scheme of things, whether a task starts a few milliseconds earlier or later seems rather insignificant compared to the heat death of the universe...
The documentation suggests this approach when you need fine-grained control over task dependencies and execution timing. You can find more details about task submission and dependencies in the Prefect documentation.