I've utilized asyncio within my flow. I am getting...
# ask-community
d
I've utilized asyncio within my flow. I am getting errors when I use the task decorator. If I remove the decorator then it works fine. • I'm unclear on how to fix this. • Should I abandon using asyncio in prefect? in hindsight it is not necessary if I utilize the task decorator with async in mind... Here is a simplified look at the code
Copy code
async def process_workspace(workspace)
    datasets_of_workspace = await get_datasets(workspace)  # error here
    ...

@task
async def get_datasets(workspace):
    ...
    return datasets

refreshing_datasets = await asyncio.gather( *[process_workspace(workspace) for workspace in workspaces])
And here is the error
TypeError: object list can't be used in 'await' expression
āœ… 1
j
hey, you definitely can you use asyncio inside of prefect. this version of your example works fine for me for example:
Copy code
import asyncio
from prefect import flow, task

async def process_workspace(workspace):
    datasets_of_workspace = await get_datasets(workspace)
    return datasets_of_workspace

@task
async def get_datasets(workspace):
    return workspace + 1

@flow
async def my_flow():
    workspaces = [1, 2, 3]
    refreshing_datasets = await asyncio.gather(*[process_workspace(workspace) for workspace in workspaces])
    for r in refreshing_datasets:
        print(r)

if __name__ == '__main__':
    asyncio.run(my_flow())
d
Thanks Jake! You're right. Here I made a better and full simplified example which is giving me the error:
Copy code
from prefect import flow, task
import asyncio


@task
async def get_datasets(workspace):
    return [1, 2, 3]

async def process_workspace(workspace):
    datasets_of_workspace = await get_datasets(workspace)
    return datasets_of_workspace

async def run_async_refresh(workspaces: list):
    refreshing_datasets = await asyncio.gather(*[process_workspace(ws) for ws in workspaces])
    return refreshing_datasets

async def run(workspaces):
    results = await run_async_refresh(workspaces)

@flow(name='pbi-refresh-all-datasets-flow')
def async_run():
    asyncio.run(run(workspaces=[1, 2, 3]))


if __name__ == '__main__':
    async_run()
j
I see. You're running asyncs task from a sync flow which can get a little complicated. It's best to try and keep flows/tasks either all async or all sync where possible as it's most intuitive. but not required. You can either make your flow async or you can remove the await expression from
await get_datasets(workspace)
->
get_datasets(workspace)
and it would work how you would expect
d
@Jake Kaplan Alternatively I can initiate the flow as async here? It was not an intentional decision I made to start it as a sync flow.
j
Yes 100%!
gratitude thank you 1
āœ… 1
šŸ™Œ 1