David Martin Calalang
07/10/2025, 9:25 PMMarvin
07/10/2025, 9:26 PMMarvin
07/10/2025, 9:27 PMwait()
and its usage with mapped tasks:
The wait()
function is essential when working with mapped tasks because it allows you to pause execution until your mapped tasks complete. This is particularly useful when you need to:
1. Ensure all parallel tasks finish before proceeding
2. Collect results from mapped tasks
3. Handle timeouts for long-running parallel operations
Here's a practical example:
python
from prefect import flow, task
from typing import List
@task
def process_item(x: int) -> int:
return x * 2
@flow
def my_flow(numbers: List[int]):
# Map creates futures for parallel execution
futures = process_item.map(numbers)
# Wait for all tasks to complete
done, not_done = futures.wait()
# You can have code after wait()!
print(f"Completed tasks: {len(done)}")
print(f"Incomplete tasks: {len(not_done)}")
# Access results after wait
results = [f.result() for f in done]
return results
result = my_flow([1, 2, 3, 4, 5])
Key points about `wait()`:
1. wait()
returns two sets:
- `done`: futures that completed successfully or were cancelled
- `not_done`: futures that haven't completed (only possible if using a timeout)
2. You can absolutely have code after wait()
! This is actually a common pattern when you need to:
- Process results from completed tasks
- Handle any failures or timeouts
- Continue with subsequent operations that depend on the mapped results
3. You can specify a timeout:
python
# Wait up to 60 seconds for completion
done, not_done = futures.wait(timeout=60)
4. If you don't need the separate done/not_done sets, you can also use results()
to get all results directly:
python
results = futures.results() # Waits for all futures and returns results
Remember that wait()
is blocking - it will pause execution until either all tasks complete or the timeout is reached (if specified). This is exactly what you want when you need to ensure all parallel operations are finished before moving forward with your workflow.