I'm trying to write an asynchronous flow using a dask task runner. When I use a concurrent task runner, everything is sweet, the following flow passes without any issues
import asyncio
import prefect
from prefect.task_runners import ConcurrentTaskRunner
@prefect.task
async def square(x: int) -> int:
return x * x
@prefect.flow(name="test", task_runner=ConcurrentTaskRunner())
async def my_flow():
future = await square.submit(3)
result = await future.result()
asyncio.run(my_flow())
Now compared to when I switch a dask task runner -
import asyncio
import prefect
from prefect_dask import DaskTaskRunner
@prefect.task
async def square(x: int) -> int:
return x * x
@prefect.flow(name="test", task_runner=DaskTaskRunner())
async def my_flow():
future = await square.submit(3)
result = await future.result()
asyncio.run(my_flow())
I start getting a TypeError telling me that a State object is not awaitable,
TypeError: object State can't be used in 'await' expression