What is the correct method to call a synchronous t...
# prefect-community
i
What is the correct method to call a synchronous task from an asynchronous flow in Prefect 2? When trying to this with the attached code, our asynchronous `async_task`’s will execute, but our
sync_task
will never start and the flow just seems to hang. The intention here is to run some asynchronous tasks, gather the results, then send those results to a synchronous or blocking task that can be executed sequentially (or submitted to a concurrent task runner). The equivalent code works in a standard Python script if we remove our Prefect decorators.
Copy code
from prefect import flow, task
import asyncio, time

@task
async def async_task(n):
    await asyncio.sleep(1)
    return n

@task
def sync_task(res):
    time.sleep(1)
    print(res)

@flow
async def async_flow():
    res = await asyncio.gather(*(async_task(n) for n in range(0, 3, 1)))
    sync_task(res)


if __name__ == "__main__":
    asyncio.run(async_flow())
See output in replies.
1
Our output in Prefect
2.6.0
reads like this, then the execution just hangs (it looks like the
sync_task
never starts):
Copy code
13:22:40.277 | INFO    | prefect.engine - Created flow run 'tourmaline-pronghorn' for flow 'async-flow'
13:22:41.313 | INFO    | Flow run 'tourmaline-pronghorn' - Created task run 'async_task-1da93ad6-0' for task 'async_task'
13:22:41.314 | INFO    | Flow run 'tourmaline-pronghorn' - Executing 'async_task-1da93ad6-0' immediately...
13:22:41.349 | INFO    | Flow run 'tourmaline-pronghorn' - Created task run 'async_task-1da93ad6-2' for task 'async_task'
13:22:41.350 | INFO    | Flow run 'tourmaline-pronghorn' - Executing 'async_task-1da93ad6-2' immediately...
13:22:41.370 | INFO    | Flow run 'tourmaline-pronghorn' - Created task run 'async_task-1da93ad6-1' for task 'async_task'
13:22:41.370 | INFO    | Flow run 'tourmaline-pronghorn' - Executing 'async_task-1da93ad6-1' immediately...
13:22:42.872 | INFO    | Task run 'async_task-1da93ad6-0' - Finished in state Completed()
13:22:42.942 | INFO    | Task run 'async_task-1da93ad6-2' - Finished in state Completed()
13:22:42.962 | INFO    | Task run 'async_task-1da93ad6-1' - Finished in state Completed()
k
This is a bug that we are working on fixing. This is the workaround for now:
Copy code
from prefect import flow, task
import asyncio, time


@task
async def async_task(n):
    await asyncio.sleep(1)
    return n


@task
def sync_task(res):
    time.sleep(1)
    res.append(1)


@flow
async def async_flow():
    res = await asyncio.gather(*(async_task(n) for n in range(0, 3, 1)))
    return res


@flow
def sync_flow(res):
    return sync_task(res)


if __name__ == "__main__":
    res = asyncio.run(async_flow())
    sync_flow(res)
Basically, you create one flow for asynchronous tasks and one flow for synchronous tasks
i
Thanks for the response Khuyen. This work-around unfortunately seems less than ideal; unless I’m misunderstanding we would need to potentially create a pretty large number of flows if we have some workloads that need to mix a lot of synchronous and asynchronous functionality. Is it better/possible to do this using sub-flows instead? Separately, is this bug logged anywhere we can track and are there any work-arounds that do not require us to create separate flows for workloads that require both async and sync functionality?
k
@Mason Menges Maybe you know where the issues are?
m
There are actually several around mixing Async and Sync tasks these are two that I know of currently https://github.com/PrefectHQ/prefect/issues/7422, https://github.com/PrefectHQ/prefect/issues/7040. That said I know for certain theres active work going on to improve the Orion engine reliability and mixing async and synchronous tasks is a specific problem we're tackling in that process.
🙏 1
i
Thanks Mason and Khuyen. Do we need to create a new issue for the “hanging” bug I’m describing? It doesn’t seem like either of these two issues are the same as what I’m describing unless I’m misunderstanding them.
It looks like there is another issue here of mixing async and sync tasks in an async flow but using .submit() method rather than just calling a task directly which apparently doesn’t hang and throws an error. https://github.com/PrefectHQ/prefect/issues/6318
m
This is a pretty clear reproducible example so I think that'd be useful if you'd be willing to 😄
👍 1
i
Will do, thanks Mason.
gratitude thank you 1