Charles Bournhonesque
03/19/2025, 2:14 PMCharles Bournhonesque
03/19/2025, 3:36 PMimport asyncio
import time
@task
async def my_task():
time.sleep(10)
@flow()
async def my_flow():
tasks = []
for i in range(2):
tasks.push(my_task.submit())
done, _ = await asyncio.wait(tasks)
I would expect done
to contain the tasks that have finished (so after at least 10 seconds), but instead it seems to contain PrefectFutures that have been submitted.
i.e. my_task.submit()
seems to create a Future whose result is the PrefectFuture.
I've tried doing
@flow()
async def my_flow():
tasks = []
for i in range(2):
tasks.push(await my_task.submit())
done, _ = await asyncio.wait(tasks)
so that tasks
contains the PrefectFuture
. But it doesn't seem to wait. In that case tasks
waits 10 seconds and contains the actual result of the task..Nate
03/19/2025, 5:13 PM» uvx --with prefect==2.20 ipython
#[1]
from prefect import flow, task
#[2]
import asyncio
@task
async def my_task():
await asyncio.sleep(10)
@flow
async def my_flow():
futures = [await my_task.submit() for _ in range(3)]
[await fut.wait() for fut in futures]
#[3]
await my_flow()
2.x docs