Hello Prefect team, a question about Orion. I'm st...
# prefect-community
c
Hello Prefect team, a question about Orion. I'm starting to build out an async ML pipeline. I see the "Getting Started" page's async example doesn't pass results between tasks. I'm looking into the documentation regarding PrefectFuture and the API seems largely similar to `asyncio`'s
Future
. However, before I continue down this async rabbit hole, I'm wondering if there are any gotchas (or docs or source code) I should look into? In particular, regarding how to use PrefectFutures in mixed async / sync flows and tasks. Moreover, will I be able to use `asnycio`'s synchronisation primitives (https://docs.python.org/3/library/asyncio-sync.html) with PrefectFutures? Thanks in advance! 🙌
a
I'm afk so can't test a mix of sync and async atm but regarding data passing with async, check this topic https://discourse.prefect.io/t/is-there-a-cleaner-way-to-pass-async-task-results-downstream/560 (kudos to @Andrew Huang for that!)
🙌 2
@Chris L. I looked a bit more and it looks like you can pass data between tasks in both sync and async flows but you can't have sync tasks in async flows and vice versa. Here are some working examples: Sync flow:
Copy code
from prefect import task, flow
from prefect import get_run_logger


@task
def get_name():
    return "Marvin"


@task
def say_hi(user_name: str):
    logger = get_run_logger()
    <http://logger.info|logger.info>("Hello %s!", user_name)


@flow
def hello_world():
    user = get_name()
    say_hi(user)


if __name__ == "__main__":
    hello_world()
Async flow
Copy code
import asyncio
from prefect import task, flow
from prefect import get_run_logger


@task
async def get_name():
    return "Marvin"


@task
async def say_hi(user_name: str):
    logger = get_run_logger()
    <http://logger.info|logger.info>("Hello %s!", user_name)


@flow
async def async_hello_world():
    user = await get_name()
    await say_hi(user)


if __name__ == "__main__":
    asyncio.run(async_hello_world())
Async flow with a result (returning something at the end)
Copy code
import asyncio
from prefect import task, flow
from prefect import get_run_logger


@task
async def get_name():
    return "Marvin"


@task
async def say_hi(user_name: str):
    logger = get_run_logger()
    <http://logger.info|logger.info>("Hello %s!", user_name)
    return "Orion is awesome!"


@flow
async def async_hello_world_with_final_result():
    user = await get_name()
    return await say_hi(user)


if __name__ == "__main__":
    asyncio.run(async_hello_world_with_final_result())
Did this answer your question?
Some points that may help understand the usage: 1. If you use
sync
tasks, you can call them the same way as in Prefect 1 and pass data as usual 2. If you use
async
tasks, you need to use
await
before calling that task, other than that you can
return
values normally - this means that you can do
task_1 = await my_task(); task_2 = await my_other_task(task_1)
3. You can even have
synchronous
tasks in
async
flows and vice versa (i.e. you can have async tasks inside sync flows). You just can't
await
your async task because that's invalid Python, but Prefect will make it work 4. You can use normal
synchronous
tasks and when you use the default
ConcurrentTaskRunner
, it will use async under the hood to run them concurrently without you having to explicitly make those tasks async
Regarding:
will I be able to use `asnycio`'s synchronisation primitives (https://docs.python.org/3/library/asyncio-sync.html) with PrefectFutures?
I believe we already use those under the hood, so yes, but @Zanie will know it best (sorry for tagging but we need your async knowledge here 🙂)