Hi, in prefect 2.0; I have a non-async flow-run th...
# ask-community
c
Hi, in prefect 2.0; I have a non-async flow-run that submits async tasks. I don't call await anywhere but things seem to work fine; does the flow-run wait for all async tasks to finish before returning? How do I get the same behaviour with an async flow-run? Do i need to await my async tasks manually?
n
hi @Charles Bournhonesque - we have some docs on this here and there's also more context here
c
I'm using prefect 2.0. One issue i'm getting is that if I try to do:
Copy code
import asyncio
import time

@task
async def my_task():
    time.sleep(10)


@flow()
async def my_flow():
    tasks = []
    for i in range(2):
        tasks.push(my_task.submit())
    done, _ = await asyncio.wait(tasks)
I would expect
done
to contain the tasks that have finished (so after at least 10 seconds), but instead it seems to contain PrefectFutures that have been submitted. i.e.
my_task.submit()
seems to create a Future whose result is the PrefectFuture. I've tried doing
Copy code
@flow()
async def my_flow():
    tasks = []
    for i in range(2):
        tasks.push(await my_task.submit())
    done, _ = await asyncio.wait(tasks)
so that
tasks
contains the
PrefectFuture
. But it doesn't seem to wait. In that case
tasks
waits 10 seconds and contains the actual result of the task..
n
well a couple points • putting time.sleep in an "async" task will result in your code being blocking and effectively not async, asyncio.sleep would be better • that's not how you're meant to use submit / futures in prefect. you can choose to use tasks or flows like normal python with std python async like asyncio/anyio or instead defer that responsibilty to prefect by using submit/map/result etc
Copy code
» uvx --with prefect==2.20 ipython

#[1]
from prefect import flow, task

#[2]
import asyncio

@task
async def my_task():
    await asyncio.sleep(10)

@flow
async def my_flow():
    futures = [await my_task.submit() for _ in range(3)]
    [await fut.wait() for fut in futures]


#[3]
await my_flow()
2.x docs