https://prefect.io logo
Title
m

Michael Maletich

09/15/2022, 11:50 AM
Hi, I’m having trouble getting prefect annotations to work with asyncio. I get the code to run fine but mypy complains that the annotated functions are no longer asynchronous. Removing either prefect annotations or typings allows it work Example:
import asyncio
from prefect import flow, task


@task
async def slow_task(a_value: int) -> int:
    await asyncio.sleep(1)
    return a_value


@flow
async def my_flow(a_value: int) -> int:
    return await slow_task(a_value)

if __name__ == "__main__":
    result: int = asyncio.run(my_flow(42))
    print(result)
results from mypy:
test.py:13: error: Incompatible types in "await" (actual type "None", expected type "Awaitable[Any]")
test.py:17: error: Argument 1 to "run" has incompatible type "None"; expected "Coroutine[Any, Any, <nothing>]"
Any tips on making typings work with asyncio and prefect
1
1
t

Taylor Curran

09/15/2022, 12:17 PM
Hi Michael, I wonder if you have tried adding
.submit()
when you call your task runs. This will allow them to be called asynchronously. Check out this page in the docs for more info: https://docs.prefect.io/tutorials/execution/?h=.sub#result
m

Michael Maletich

09/15/2022, 1:17 PM
Thank you. That did resolve the type issue on the task but still have the type issue on the flow. Is there something similar to submit for flows?
import asyncio
from prefect import flow, task


@task
async def slow_task(a_value: int) -> int:
    await asyncio.sleep(1)
    return a_value


@flow
async def my_flow(a_value: int) -> int:
    future = await slow_task.submit(a_value)
    return await future.result()

if __name__ == "__main__":
    result: int = asyncio.run(my_flow(42))
    print(result)
t

Taylor Curran

09/15/2022, 2:41 PM
First class support for asynchronous sub-flow runs is on the roadmap, in the mean time, here is a workaround for creating a flow run for a deployment via an API call: How to create a flow run from deployment (orchestrator pattern)? It won’t be linked as a subflow to the parent flow, but you can create new flow runs from your current one which would then be run concurrently by agents. This is necessary for having them run on new infrastructure. If you are just looking to have subflows run asynchronously/concurrently locally, you should already be able to do that with asyncio primitives.
m

Michael Maletich

09/15/2022, 5:24 PM
thank you
a

Alex Turek

09/16/2022, 9:44 PM
I'm running into this too - Michael did you resolve this via casting to an Awaitable, or #type:ignore-ing, or a third thing? 😄
m

Michael Maletich

09/19/2022, 11:17 AM
Yes, I cast the flow. There is probably a clever general purpose solution but this is what I have for now:
import asyncio
from prefect import flow, task
from typing import Coroutine, cast, Callable


@task
async def slow_task(a_value: int) -> int:
    await asyncio.sleep(1)
    return a_value


@flow
async def _my_flow(a_value: int) -> int:
    future = await slow_task.submit(a_value)
    return await future.result()

my_flow = cast(
    Callable[[int], Coroutine[int, None, int]],
    _my_flow,
)

if __name__ == "__main__":
    result = asyncio.run(my_flow(42))
    print(result)
:thank-you: 1