Chris L.

    Chris L.

    6 months ago
    Hello Prefect team, a question about Orion. I'm starting to build out an async ML pipeline. I see the "Getting Started" page's async example doesn't pass results between tasks. I'm looking into the documentation regarding PrefectFuture and the API seems largely similar to asyncio's
    Future
    . However, before I continue down this async rabbit hole, I'm wondering if there are any gotchas (or docs or source code) I should look into? In particular, regarding how to use PrefectFutures in mixed async / sync flows and tasks. Moreover, will I be able to use asnycio's synchronisation primitives (https://docs.python.org/3/library/asyncio-sync.html) with PrefectFutures? Thanks in advance! 🙌
    Anna Geller

    Anna Geller

    6 months ago
    I'm afk so can't test a mix of sync and async atm but regarding data passing with async, check this topic https://discourse.prefect.io/t/is-there-a-cleaner-way-to-pass-async-task-results-downstream/560 (kudos to @Andrew Huang for that!)
    @Chris L. I looked a bit more and it looks like you can pass data between tasks in both sync and async flows but you can't have sync tasks in async flows and vice versa. Here are some working examples: Sync flow:
    from prefect import task, flow
    from prefect import get_run_logger
    
    
    @task
    def get_name():
        return "Marvin"
    
    
    @task
    def say_hi(user_name: str):
        logger = get_run_logger()
        <http://logger.info|logger.info>("Hello %s!", user_name)
    
    
    @flow
    def hello_world():
        user = get_name()
        say_hi(user)
    
    
    if __name__ == "__main__":
        hello_world()
    Async flow
    import asyncio
    from prefect import task, flow
    from prefect import get_run_logger
    
    
    @task
    async def get_name():
        return "Marvin"
    
    
    @task
    async def say_hi(user_name: str):
        logger = get_run_logger()
        <http://logger.info|logger.info>("Hello %s!", user_name)
    
    
    @flow
    async def async_hello_world():
        user = await get_name()
        await say_hi(user)
    
    
    if __name__ == "__main__":
        asyncio.run(async_hello_world())
    Async flow with a result (returning something at the end)
    import asyncio
    from prefect import task, flow
    from prefect import get_run_logger
    
    
    @task
    async def get_name():
        return "Marvin"
    
    
    @task
    async def say_hi(user_name: str):
        logger = get_run_logger()
        <http://logger.info|logger.info>("Hello %s!", user_name)
        return "Orion is awesome!"
    
    
    @flow
    async def async_hello_world_with_final_result():
        user = await get_name()
        return await say_hi(user)
    
    
    if __name__ == "__main__":
        asyncio.run(async_hello_world_with_final_result())
    Did this answer your question?
    Some points that may help understand the usage:1. If you use
    sync
    tasks, you can call them the same way as in Prefect 1 and pass data as usual 2. If you use
    async
    tasks, you need to use
    await
    before calling that task, other than that you can
    return
    values normally - this means that you can do
    task_1 = await my_task(); task_2 = await my_other_task(task_1)
    3. You can even have
    synchronous
    tasks in
    async
    flows and vice versa (i.e. you can have async tasks inside sync flows). You just can't
    await
    your async task because that's invalid Python, but Prefect will make it work 4. You can use normal
    synchronous
    tasks and when you use the default
    ConcurrentTaskRunner
    , it will use async under the hood to run them concurrently without you having to explicitly make those tasks async
    Regarding:
    will I be able to use asnycio's synchronisation primitives (https://docs.python.org/3/library/asyncio-sync.html) with PrefectFutures?
    I believe we already use those under the hood, so yes, but @Michael Adkins will know it best (sorry for tagging but we need your async knowledge here 🙂)