Derek Chase
10/13/2023, 12:28 PMasync def process_workspace(workspace)
datasets_of_workspace = await get_datasets(workspace) # error here
...
@task
async def get_datasets(workspace):
...
return datasets
refreshing_datasets = await asyncio.gather( *[process_workspace(workspace) for workspace in workspaces])
And here is the error
TypeError: object list can't be used in 'await' expression
Jake Kaplan
10/13/2023, 1:49 PMimport asyncio
from prefect import flow, task
async def process_workspace(workspace):
datasets_of_workspace = await get_datasets(workspace)
return datasets_of_workspace
@task
async def get_datasets(workspace):
return workspace + 1
@flow
async def my_flow():
workspaces = [1, 2, 3]
refreshing_datasets = await asyncio.gather(*[process_workspace(workspace) for workspace in workspaces])
for r in refreshing_datasets:
print(r)
if __name__ == '__main__':
asyncio.run(my_flow())
Derek Chase
10/13/2023, 2:23 PMfrom prefect import flow, task
import asyncio
@task
async def get_datasets(workspace):
return [1, 2, 3]
async def process_workspace(workspace):
datasets_of_workspace = await get_datasets(workspace)
return datasets_of_workspace
async def run_async_refresh(workspaces: list):
refreshing_datasets = await asyncio.gather(*[process_workspace(ws) for ws in workspaces])
return refreshing_datasets
async def run(workspaces):
results = await run_async_refresh(workspaces)
@flow(name='pbi-refresh-all-datasets-flow')
def async_run():
asyncio.run(run(workspaces=[1, 2, 3]))
if __name__ == '__main__':
async_run()
Jake Kaplan
10/13/2023, 2:34 PMawait get_datasets(workspace)
-> get_datasets(workspace)
and it would work how you would expectDerek
10/13/2023, 6:33 PMJake Kaplan
10/13/2023, 6:37 PM