Hi, I’m having trouble getting prefect annotations...
# prefect-community
m
Hi, I’m having trouble getting prefect annotations to work with asyncio. I get the code to run fine but mypy complains that the annotated functions are no longer asynchronous. Removing either prefect annotations or typings allows it work Example:
Copy code
import asyncio
from prefect import flow, task


@task
async def slow_task(a_value: int) -> int:
    await asyncio.sleep(1)
    return a_value


@flow
async def my_flow(a_value: int) -> int:
    return await slow_task(a_value)

if __name__ == "__main__":
    result: int = asyncio.run(my_flow(42))
    print(result)
results from mypy:
Copy code
test.py:13: error: Incompatible types in "await" (actual type "None", expected type "Awaitable[Any]")
test.py:17: error: Argument 1 to "run" has incompatible type "None"; expected "Coroutine[Any, Any, <nothing>]"
Any tips on making typings work with asyncio and prefect
1
1
t
Hi Michael, I wonder if you have tried adding
.submit()
when you call your task runs. This will allow them to be called asynchronously. Check out this page in the docs for more info: https://docs.prefect.io/tutorials/execution/?h=.sub#result
m
Thank you. That did resolve the type issue on the task but still have the type issue on the flow. Is there something similar to submit for flows?
Copy code
import asyncio
from prefect import flow, task


@task
async def slow_task(a_value: int) -> int:
    await asyncio.sleep(1)
    return a_value


@flow
async def my_flow(a_value: int) -> int:
    future = await slow_task.submit(a_value)
    return await future.result()

if __name__ == "__main__":
    result: int = asyncio.run(my_flow(42))
    print(result)
t
First class support for asynchronous sub-flow runs is on the roadmap, in the mean time, here is a workaround for creating a flow run for a deployment via an API call: How to create a flow run from deployment (orchestrator pattern)? It won’t be linked as a subflow to the parent flow, but you can create new flow runs from your current one which would then be run concurrently by agents. This is necessary for having them run on new infrastructure. If you are just looking to have subflows run asynchronously/concurrently locally, you should already be able to do that with asyncio primitives.
m
thank you
a
I'm running into this too - Michael did you resolve this via casting to an Awaitable, or #type:ignore-ing, or a third thing? 😄
m
Yes, I cast the flow. There is probably a clever general purpose solution but this is what I have for now:
Copy code
import asyncio
from prefect import flow, task
from typing import Coroutine, cast, Callable


@task
async def slow_task(a_value: int) -> int:
    await asyncio.sleep(1)
    return a_value


@flow
async def _my_flow(a_value: int) -> int:
    future = await slow_task.submit(a_value)
    return await future.result()

my_flow = cast(
    Callable[[int], Coroutine[int, None, int]],
    _my_flow,
)

if __name__ == "__main__":
    result = asyncio.run(my_flow(42))
    print(result)
🙏 1