Michael Maletich
09/15/2022, 11:50 AMimport asyncio
from prefect import flow, task
@task
async def slow_task(a_value: int) -> int:
await asyncio.sleep(1)
return a_value
@flow
async def my_flow(a_value: int) -> int:
return await slow_task(a_value)
if __name__ == "__main__":
result: int = asyncio.run(my_flow(42))
print(result)
results from mypy:
test.py:13: error: Incompatible types in "await" (actual type "None", expected type "Awaitable[Any]")
test.py:17: error: Argument 1 to "run" has incompatible type "None"; expected "Coroutine[Any, Any, <nothing>]"
Any tips on making typings work with asyncio and prefectTaylor Curran
09/15/2022, 12:17 PM.submit()
when you call your task runs. This will allow them to be called asynchronously.
Check out this page in the docs for more info:
https://docs.prefect.io/tutorials/execution/?h=.sub#resultMichael Maletich
09/15/2022, 1:17 PMimport asyncio
from prefect import flow, task
@task
async def slow_task(a_value: int) -> int:
await asyncio.sleep(1)
return a_value
@flow
async def my_flow(a_value: int) -> int:
future = await slow_task.submit(a_value)
return await future.result()
if __name__ == "__main__":
result: int = asyncio.run(my_flow(42))
print(result)
Taylor Curran
09/15/2022, 2:41 PMMichael Maletich
09/15/2022, 5:24 PMAlex Turek
09/16/2022, 9:44 PMMichael Maletich
09/19/2022, 11:17 AMimport asyncio
from prefect import flow, task
from typing import Coroutine, cast, Callable
@task
async def slow_task(a_value: int) -> int:
await asyncio.sleep(1)
return a_value
@flow
async def _my_flow(a_value: int) -> int:
future = await slow_task.submit(a_value)
return await future.result()
my_flow = cast(
Callable[[int], Coroutine[int, None, int]],
_my_flow,
)
if __name__ == "__main__":
result = asyncio.run(my_flow(42))
print(result)