<@ULVA73B9P> how can i await async tasks that were...
# ask-marvin
c
@Marvin how can i await async tasks that were submitted inside an async flow using task.submit()? That returns a PrefectConcurrentFuture that doesn't have any asynchronous function
m
thought for 45.2 seconds
Let me provide you with a clear explanation and example of how to handle async tasks submitted with
task.submit()
. There are two main approaches you can use: 1. Using
result()
for individual futures:
Copy code
python
from prefect import flow, task

@task
def my_task(x):
    return x * 2

@flow
async def my_flow():
    # Submit multiple tasks
    future1 = my_task.submit(1)
    future2 = my_task.submit(2)
    
    # Await their results individually
    result1 = await future1.result()
    result2 = await future2.result()
    return result1, result2
2. Using
wait()
for multiple futures (recommended for handling multiple tasks):
Copy code
python
from prefect import flow, task
from prefect.futures import wait

@task
def my_task(x):
    return x * 2

@flow
async def my_flow():
    # Submit multiple tasks
    futures = [
        my_task.submit(1),
        my_task.submit(2),
        my_task.submit(3)
    ]
    
    # Wait for all futures to complete
    done, not_done = await wait(futures)
    
    # Get results from completed futures
    results = [future.result() for future in done]
    return results
Key points to remember: - The
result()
method is awaitable and will return the task's result or raise any exceptions that occurred during execution - You can specify a timeout when calling `result()`:
await future.result(timeout=60)
- The
wait()
function is particularly useful when dealing with multiple futures as it allows you to wait for all of them to complete at once - If any task fails,
result()
will raise the exception by default. You can change this behavior with
raise_on_failure=False
This pattern is particularly useful when you want to run multiple tasks concurrently and then gather their results before proceeding with the rest of your flow.
c
@Marvin how come the result() method is awaitable? that doesn't seem to be the case https://github.com/PrefectHQ/prefect/blob/3.2.7/src/prefect/futures.py#L75
m
thought for 51.7 seconds
Let me correct my previous response. You're absolutely correct -
result()
is not an async method, and I made a mistake in suggesting to
await
it. The
PrefectConcurrentFuture.result()
method is actually a blocking call that runs in a thread. Here's the correct way to handle task submission in async flows:
Copy code
python
from prefect import flow, task

@task
def my_task(x):
    return x * 2

@flow
async def my_flow():
    # Submit multiple tasks
    future1 = my_task.submit(1)
    future2 = my_task.submit(2)
    
    # Get results (these are blocking calls, but run in threads)
    result1 = future1.result()
    result2 = future2.result()
    return result1, result2
For multiple futures, you can still use
wait()
which is properly async:
Copy code
python
from prefect import flow, task
from prefect.futures import wait

@task
def my_task(x):
    return x * 2

@flow
async def my_flow():
    futures = [
        my_task.submit(1),
        my_task.submit(2),
        my_task.submit(3)
    ]
    
    # wait() is properly async
    done, not_done = await wait(futures)
    
    # Get results (blocking calls, but in threads)
    results = [f.result() for f in done]
    return results
The key differences from my previous response: 1.
result()
is NOT awaitable - it's a blocking call (but runs in a thread) 2. You don't use
await
with
result()
3.
wait()
is still properly async and should be awaited I apologize for any confusion my previous response may have caused. Thank you for the correction!