Demid
08/06/2025, 11:59 AMMarvin
08/06/2025, 11:59 AMMarvin
08/06/2025, 12:01 PMas_completed
and wait
with async tasks in Prefect 3.x. I'll create comprehensive examples that demonstrate both patterns:
python
from prefect import flow, task
from prefect.futures import as_completed, wait
import asyncio
@task
async def async_task(number: int) -> int:
await asyncio.sleep(number) # Simulate varying completion times
return number * 2
@flow
async def example_as_completed_flow():
# Create multiple async task runs
futures = [
async_task.submit(i) for i in [3, 1, 4, 2]
]
# Use as_completed to process results as they arrive
async for future in as_completed(futures):
result = await future.result()
print(f"Task completed with result: {result}")
@flow
async def example_wait_flow():
# Create multiple async task runs
futures = [
async_task.submit(i) for i in [3, 1, 4, 2]
]
# Wait for all futures to complete
done, not_done = await wait(futures)
# Process completed futures
for future in done:
result = await future.result()
print(f"Completed task result: {result}")
# Check if any tasks didn't complete (e.g., due to timeout)
for future in not_done:
print(f"Task did not complete: {future}")
# Example with timeout
@flow
async def example_timeout_flow():
futures = [
async_task.submit(i) for i in [1, 5, 2]
]
# Wait with timeout
done, not_done = await wait(futures, timeout=3.0)
print("Completed within timeout:")
for future in done:
result = await future.result()
print(f"Result: {result}")
print("\nNot completed within timeout:")
for future in not_done:
print(f"Pending task: {future}")
# Run the flows
if __name__ == "__main__":
import asyncio
# Run as_completed example
asyncio.run(example_as_completed_flow())
# Run wait example
asyncio.run(example_wait_flow())
# Run timeout example
asyncio.run(example_timeout_flow())
Let's break down the key points:
1. as_completed Usage:
- as_completed
returns an async iterator that yields futures as they complete
- It's useful when you want to process results as soon as they're available
- The order of results will be based on completion time, not submission order
2. wait Usage:
- wait
returns two sets: done
and not_done
- It blocks until either all futures complete or the timeout is reached
- Useful when you need to ensure all tasks are complete before proceeding
- Can specify a timeout to avoid waiting indefinitely
3. Key Features:
- Both methods work with PrefectFuture
objects returned by .submit()
- You can await the .result()
method on futures to get the actual return value
- Timeout parameters help manage long-running tasks
Expected output for the example_as_completed_flow
might look like:
Task completed with result: 2 # (1 second task)
Task completed with result: 4 # (2 second task)
Task completed with result: 6 # (3 second task)
Task completed with result: 8 # (4 second task)
For the timeout example, tasks taking longer than 3 seconds will appear in the not_done
set.
Remember that:
- Always use async/await
syntax appropriately
- The futures must be created using .submit()
on tasks
- Error handling should be added in production code
- These patterns are particularly useful when managing multiple concurrent tasks with different completion times
The examples above demonstrate the most common usage patterns, but you can adapt them based on your specific needs, such as adding error handling or combining them with other async patterns.