<@ULVA73B9P> I am using Prefect 2.0; and this is m...
# marvin-ai
c
@Marvin I am using Prefect 2.0; and this is my code:
Copy code
import asyncio
import time

@task
async def my_task():
    time.sleep(10)


@flow()
async def my_flow():
    tasks = []
    for i in range(2):
        tasks.push(asyncio.Task(my_task.submit())
    done, _ = await asyncio.wait(tasks)
I would expect the
await
to wait for any of the tasks to finish (so at least 10 seconds), but it seems to return immediately
m
It seems like you're trying to execute asynchronous tasks within a Prefect 2.0 flow and expect the
await
to wait for the tasks to finish. However, there are a few issues in your code snippet: 1. Incorrect Method for Task Submission: In Prefect 2.0, tasks are typically submitted using the
.submit()
method, which returns a future-like object. You cannot directly use
asyncio.Task()
with Prefect tasks. 2. Using
asyncio.wait
: The
asyncio.wait
method is used to wait for the completion of the tasks, but it should be used with the correct tasks/futures. Here's a revised version of your code:
Copy code
python
from prefect import flow, task
import asyncio

@task
async def my_task():
    await asyncio.sleep(10)

@flow
async def my_flow():
    tasks = [my_task.submit() for _ in range(2)]
    done, _ = await asyncio.wait([task.result() for task in tasks])
    for task in done:
        print(task.result())
### Key Points: - Task Submission: Use
my_task.submit()
to submit tasks and obtain a future-like object. - Awaiting Results: Use
await asyncio.wait([task.result() for task in tasks])
to wait for all tasks to complete. For more details, you can explore the Prefect 2.0 Task Concepts and Flow Concepts.
c
Ah,
task.result()
is the task's future that we want to wait on?