Chris L.
03/25/2022, 6:22 AMFuture
. However, before I continue down this async rabbit hole, I'm wondering if there are any gotchas (or docs or source code) I should look into? In particular, regarding how to use PrefectFutures in mixed async / sync flows and tasks. Moreover, will I be able to use `asnycio`'s synchronisation primitives (https://docs.python.org/3/library/asyncio-sync.html) with PrefectFutures? Thanks in advance! 🙌Anna Geller
from prefect import task, flow
from prefect import get_run_logger
@task
def get_name():
return "Marvin"
@task
def say_hi(user_name: str):
logger = get_run_logger()
<http://logger.info|logger.info>("Hello %s!", user_name)
@flow
def hello_world():
user = get_name()
say_hi(user)
if __name__ == "__main__":
hello_world()
Async flow
import asyncio
from prefect import task, flow
from prefect import get_run_logger
@task
async def get_name():
return "Marvin"
@task
async def say_hi(user_name: str):
logger = get_run_logger()
<http://logger.info|logger.info>("Hello %s!", user_name)
@flow
async def async_hello_world():
user = await get_name()
await say_hi(user)
if __name__ == "__main__":
asyncio.run(async_hello_world())
Async flow with a result (returning something at the end)
import asyncio
from prefect import task, flow
from prefect import get_run_logger
@task
async def get_name():
return "Marvin"
@task
async def say_hi(user_name: str):
logger = get_run_logger()
<http://logger.info|logger.info>("Hello %s!", user_name)
return "Orion is awesome!"
@flow
async def async_hello_world_with_final_result():
user = await get_name()
return await say_hi(user)
if __name__ == "__main__":
asyncio.run(async_hello_world_with_final_result())
Did this answer your question?sync
tasks, you can call them the same way as in Prefect 1 and pass data as usual
2. If you use async
tasks, you need to use await
before calling that task, other than that you can return
values normally - this means that you can do task_1 = await my_task(); task_2 = await my_other_task(task_1)
3. You can even have synchronous
tasks in async
flows and vice versa (i.e. you can have async tasks inside sync flows). You just can't await
your async task because that's invalid Python, but Prefect will make it work
4. You can use normal synchronous
tasks and when you use the default ConcurrentTaskRunner
, it will use async under the hood to run them concurrently without you having to explicitly make those tasks asyncwill I be able to use `asnycio`'s synchronisation primitives (https://docs.python.org/3/library/asyncio-sync.html) with PrefectFutures?I believe we already use those under the hood, so yes, but @Zanie will know it best (sorry for tagging but we need your async knowledge here 🙂)