<@ULVA73B9P> Hi, can you show me please some examp...
# ask-marvin
d
@Marvin Hi, can you show me please some examples, how work prefect-futures#as-completed and prefect-futures#wait with async tasks
m
thought for 123.8 seconds
Let me provide you with practical examples of how to use both
as_completed
and
wait
with async tasks in Prefect 3.x. I'll create comprehensive examples that demonstrate both patterns:
Copy code
python
from prefect import flow, task
from prefect.futures import as_completed, wait
import asyncio

@task
async def async_task(number: int) -> int:
    await asyncio.sleep(number)  # Simulate varying completion times
    return number * 2

@flow
async def example_as_completed_flow():
    # Create multiple async task runs
    futures = [
        async_task.submit(i) for i in [3, 1, 4, 2]
    ]
    
    # Use as_completed to process results as they arrive
    async for future in as_completed(futures):
        result = await future.result()
        print(f"Task completed with result: {result}")

@flow
async def example_wait_flow():
    # Create multiple async task runs
    futures = [
        async_task.submit(i) for i in [3, 1, 4, 2]
    ]
    
    # Wait for all futures to complete
    done, not_done = await wait(futures)
    
    # Process completed futures
    for future in done:
        result = await future.result()
        print(f"Completed task result: {result}")
    
    # Check if any tasks didn't complete (e.g., due to timeout)
    for future in not_done:
        print(f"Task did not complete: {future}")

# Example with timeout
@flow
async def example_timeout_flow():
    futures = [
        async_task.submit(i) for i in [1, 5, 2]
    ]
    
    # Wait with timeout
    done, not_done = await wait(futures, timeout=3.0)
    
    print("Completed within timeout:")
    for future in done:
        result = await future.result()
        print(f"Result: {result}")
    
    print("\nNot completed within timeout:")
    for future in not_done:
        print(f"Pending task: {future}")

# Run the flows
if __name__ == "__main__":
    import asyncio
    
    # Run as_completed example
    asyncio.run(example_as_completed_flow())
    
    # Run wait example
    asyncio.run(example_wait_flow())
    
    # Run timeout example
    asyncio.run(example_timeout_flow())
Let's break down the key points: 1. as_completed Usage: -
as_completed
returns an async iterator that yields futures as they complete - It's useful when you want to process results as soon as they're available - The order of results will be based on completion time, not submission order 2. wait Usage: -
wait
returns two sets:
done
and
not_done
- It blocks until either all futures complete or the timeout is reached - Useful when you need to ensure all tasks are complete before proceeding - Can specify a timeout to avoid waiting indefinitely 3. Key Features: - Both methods work with
PrefectFuture
objects returned by
.submit()
- You can await the
.result()
method on futures to get the actual return value - Timeout parameters help manage long-running tasks Expected output for the
example_as_completed_flow
might look like:
Copy code
Task completed with result: 2  # (1 second task)
Task completed with result: 4  # (2 second task)
Task completed with result: 6  # (3 second task)
Task completed with result: 8  # (4 second task)
For the timeout example, tasks taking longer than 3 seconds will appear in the
not_done
set. Remember that: - Always use
async/await
syntax appropriately - The futures must be created using
.submit()
on tasks - Error handling should be added in production code - These patterns are particularly useful when managing multiple concurrent tasks with different completion times The examples above demonstrate the most common usage patterns, but you can adapt them based on your specific needs, such as adding error handling or combining them with other async patterns.