<@ULVA73B9P> I'm using the .map() function in Pref...
# ask-marvin
d
@Marvin I'm using the .map() function in Prefect, can you explain the wait() function and when I should use it? Can I have code after wait()?
m
thought for 90.8 seconds
Here's a comprehensive explanation of
wait()
and its usage with mapped tasks: The
wait()
function is essential when working with mapped tasks because it allows you to pause execution until your mapped tasks complete. This is particularly useful when you need to: 1. Ensure all parallel tasks finish before proceeding 2. Collect results from mapped tasks 3. Handle timeouts for long-running parallel operations Here's a practical example:
Copy code
python
from prefect import flow, task
from typing import List

@task
def process_item(x: int) -> int:
    return x * 2

@flow
def my_flow(numbers: List[int]):
    # Map creates futures for parallel execution
    futures = process_item.map(numbers)
    
    # Wait for all tasks to complete
    done, not_done = futures.wait()
    
    # You can have code after wait()!
    print(f"Completed tasks: {len(done)}")
    print(f"Incomplete tasks: {len(not_done)}")
    
    # Access results after wait
    results = [f.result() for f in done]
    return results

result = my_flow([1, 2, 3, 4, 5])
Key points about `wait()`: 1.
wait()
returns two sets: - `done`: futures that completed successfully or were cancelled - `not_done`: futures that haven't completed (only possible if using a timeout) 2. You can absolutely have code after
wait()
! This is actually a common pattern when you need to: - Process results from completed tasks - Handle any failures or timeouts - Continue with subsequent operations that depend on the mapped results 3. You can specify a timeout:
Copy code
python
# Wait up to 60 seconds for completion
done, not_done = futures.wait(timeout=60)
4. If you don't need the separate done/not_done sets, you can also use
results()
to get all results directly:
Copy code
python
results = futures.results()  # Waits for all futures and returns results
Remember that
wait()
is blocking - it will pause execution until either all tasks complete or the timeout is reached (if specified). This is exactly what you want when you need to ensure all parallel operations are finished before moving forward with your workflow.