Charles Bournhonesque
07/18/2025, 6:30 PMMarvin
07/18/2025, 6:30 PMMarvin
07/18/2025, 6:31 PMtask.submit()
.
There are two main approaches you can use:
1. Using result()
for individual futures:
python
from prefect import flow, task
@task
def my_task(x):
return x * 2
@flow
async def my_flow():
# Submit multiple tasks
future1 = my_task.submit(1)
future2 = my_task.submit(2)
# Await their results individually
result1 = await future1.result()
result2 = await future2.result()
return result1, result2
2. Using wait()
for multiple futures (recommended for handling multiple tasks):
python
from prefect import flow, task
from prefect.futures import wait
@task
def my_task(x):
return x * 2
@flow
async def my_flow():
# Submit multiple tasks
futures = [
my_task.submit(1),
my_task.submit(2),
my_task.submit(3)
]
# Wait for all futures to complete
done, not_done = await wait(futures)
# Get results from completed futures
results = [future.result() for future in done]
return results
Key points to remember:
- The result()
method is awaitable and will return the task's result or raise any exceptions that occurred during execution
- You can specify a timeout when calling `result()`: await future.result(timeout=60)
- The wait()
function is particularly useful when dealing with multiple futures as it allows you to wait for all of them to complete at once
- If any task fails, result()
will raise the exception by default. You can change this behavior with raise_on_failure=False
This pattern is particularly useful when you want to run multiple tasks concurrently and then gather their results before proceeding with the rest of your flow.Charles Bournhonesque
07/18/2025, 6:32 PMMarvin
07/18/2025, 6:33 PMMarvin
07/18/2025, 6:33 PMresult()
is not an async method, and I made a mistake in suggesting to await
it. The PrefectConcurrentFuture.result()
method is actually a blocking call that runs in a thread.
Here's the correct way to handle task submission in async flows:
python
from prefect import flow, task
@task
def my_task(x):
return x * 2
@flow
async def my_flow():
# Submit multiple tasks
future1 = my_task.submit(1)
future2 = my_task.submit(2)
# Get results (these are blocking calls, but run in threads)
result1 = future1.result()
result2 = future2.result()
return result1, result2
For multiple futures, you can still use wait()
which is properly async:
python
from prefect import flow, task
from prefect.futures import wait
@task
def my_task(x):
return x * 2
@flow
async def my_flow():
futures = [
my_task.submit(1),
my_task.submit(2),
my_task.submit(3)
]
# wait() is properly async
done, not_done = await wait(futures)
# Get results (blocking calls, but in threads)
results = [f.result() for f in done]
return results
The key differences from my previous response:
1. result()
is NOT awaitable - it's a blocking call (but runs in a thread)
2. You don't use await
with result()
3. wait()
is still properly async and should be awaited
I apologize for any confusion my previous response may have caused. Thank you for the correction!