Ilya Galperin
11/15/2022, 9:23 PMsync_task
will never start and the flow just seems to hang. The intention here is to run some asynchronous tasks, gather the results, then send those results to a synchronous or blocking task that can be executed sequentially (or submitted to a concurrent task runner).
The equivalent code works in a standard Python script if we remove our Prefect decorators.
from prefect import flow, task
import asyncio, time
@task
async def async_task(n):
await asyncio.sleep(1)
return n
@task
def sync_task(res):
time.sleep(1)
print(res)
@flow
async def async_flow():
res = await asyncio.gather(*(async_task(n) for n in range(0, 3, 1)))
sync_task(res)
if __name__ == "__main__":
asyncio.run(async_flow())
See output in replies.2.6.0
reads like this, then the execution just hangs (it looks like the sync_task
never starts):
13:22:40.277 | INFO | prefect.engine - Created flow run 'tourmaline-pronghorn' for flow 'async-flow'
13:22:41.313 | INFO | Flow run 'tourmaline-pronghorn' - Created task run 'async_task-1da93ad6-0' for task 'async_task'
13:22:41.314 | INFO | Flow run 'tourmaline-pronghorn' - Executing 'async_task-1da93ad6-0' immediately...
13:22:41.349 | INFO | Flow run 'tourmaline-pronghorn' - Created task run 'async_task-1da93ad6-2' for task 'async_task'
13:22:41.350 | INFO | Flow run 'tourmaline-pronghorn' - Executing 'async_task-1da93ad6-2' immediately...
13:22:41.370 | INFO | Flow run 'tourmaline-pronghorn' - Created task run 'async_task-1da93ad6-1' for task 'async_task'
13:22:41.370 | INFO | Flow run 'tourmaline-pronghorn' - Executing 'async_task-1da93ad6-1' immediately...
13:22:42.872 | INFO | Task run 'async_task-1da93ad6-0' - Finished in state Completed()
13:22:42.942 | INFO | Task run 'async_task-1da93ad6-2' - Finished in state Completed()
13:22:42.962 | INFO | Task run 'async_task-1da93ad6-1' - Finished in state Completed()
Khuyen Tran
11/15/2022, 10:20 PMfrom prefect import flow, task
import asyncio, time
@task
async def async_task(n):
await asyncio.sleep(1)
return n
@task
def sync_task(res):
time.sleep(1)
res.append(1)
@flow
async def async_flow():
res = await asyncio.gather(*(async_task(n) for n in range(0, 3, 1)))
return res
@flow
def sync_flow(res):
return sync_task(res)
if __name__ == "__main__":
res = asyncio.run(async_flow())
sync_flow(res)
Basically, you create one flow for asynchronous tasks and one flow for synchronous tasksIlya Galperin
11/15/2022, 10:38 PMKhuyen Tran
11/15/2022, 10:48 PMMason Menges
11/15/2022, 11:12 PMIlya Galperin
11/15/2022, 11:15 PMMason Menges
11/15/2022, 11:45 PMIlya Galperin
11/15/2022, 11:54 PM