Hey team!! Would someone mind helping me check whether I'm running into a bug, or if my understandin...
n
Hey team!! Would someone mind helping me check whether I'm running into a bug, or if my understanding about Dask task runners is incorrect
I'm trying to write an asynchronous flow using a dask task runner. When I use a concurrent task runner, everything is sweet, the following flow passes without any issues
Copy code
import asyncio
import prefect
from prefect.task_runners import ConcurrentTaskRunner

@prefect.task
async def square(x: int) -> int:
    return x * x

@prefect.flow(name="test", task_runner=ConcurrentTaskRunner())
async def my_flow():
    future = await square.submit(3)
    result = await future.result()

asyncio.run(my_flow())
Now compared to when I switch a dask task runner -
Copy code
import asyncio
import prefect
from prefect_dask import DaskTaskRunner

@prefect.task
async def square(x: int) -> int:
    return x * x

@prefect.flow(name="test", task_runner=DaskTaskRunner())
async def my_flow():
    future = await square.submit(3)
    result = await future.result()

asyncio.run(my_flow())
I start getting a TypeError telling me that a State object is not awaitable,
TypeError: object State can't be used in 'await' expression
Does this look like a bug to anyone, or is it a limitation of dask task runners that we can't await
PrefectFuture
results? This was done using prefect 2.10.5 and prefect-dask 0.2.3