Charles Bournhonesque
03/19/2025, 3:30 PMimport asyncio
import time
@task
async def my_task():
time.sleep(10)
@flow()
async def my_flow():
tasks = []
for i in range(2):
tasks.push(asyncio.Task(my_task.submit())
done, _ = await asyncio.wait(tasks)
I would expect the await
to wait for any of the tasks to finish (so at least 10 seconds), but it seems to return immediatelyMarvin
03/19/2025, 3:30 PMawait
to wait for the tasks to finish. However, there are a few issues in your code snippet:
1. Incorrect Method for Task Submission: In Prefect 2.0, tasks are typically submitted using the .submit()
method, which returns a future-like object. You cannot directly use asyncio.Task()
with Prefect tasks.
2. Using asyncio.wait
: The asyncio.wait
method is used to wait for the completion of the tasks, but it should be used with the correct tasks/futures.
Here's a revised version of your code:
python
from prefect import flow, task
import asyncio
@task
async def my_task():
await asyncio.sleep(10)
@flow
async def my_flow():
tasks = [my_task.submit() for _ in range(2)]
done, _ = await asyncio.wait([task.result() for task in tasks])
for task in done:
print(task.result())
### Key Points:
- Task Submission: Use my_task.submit()
to submit tasks and obtain a future-like object.
- Awaiting Results: Use await asyncio.wait([task.result() for task in tasks])
to wait for all tasks to complete.
For more details, you can explore the Prefect 2.0 Task Concepts and Flow Concepts.Charles Bournhonesque
03/19/2025, 4:06 PMtask.result()
is the task's future that we want to wait on?