<@ULVA73B9P> how do I await an async task list fro...
# ask-marvin
s
@Marvin how do I await an async task list from a sync flow with prefect?
m
To await an asynchronous task list from a synchronous flow in Prefect, you can use the
submit
method to run async tasks within a sync flow. Here's a basic example:
Copy code
python
from prefect import flow, task

@task
async def my_async_task():
    pass

@flow
def my_sync_flow():
    async_results = [my_async_task.submit() for _ in range(5)]
    for result in async_results:
        # Use `result.result()` to block and get the result of the async task
        result.result()

my_sync_flow()
In this example,
my_async_task
is an asynchronous task that you want to run multiple times. The
submit
method is used to schedule these tasks, and
result.result()
is used to block and retrieve the results synchronously. You can find more about task submission and flow execution in the Prefect tasks documentation. If you have more specific requirements or configurations, feel free to ask!
s
what is the best way to gather the results from a sync flow? Calling 'result' will block the sync code
n
Copy code
some_task.map(range(N)).result()
where yes .result is blocking but you're collecting all of them at once unlike marvin's sequentially blocking example

related

otherwise there's
as_completed
if you want that
s
@Nate as_completed from asyncio?
n
sorry, i meant
from prefect.futures
https://github.com/prefecthq/prefect/blob/main/src/prefect/futures.py#L343-L381 so the resulting list of futures from map (or list comp of submits) can be provided to that, and then you can process them as they finish
s
ohhh, generator yielding completed futures! Sweet, thanks!
catjam 1
I'm getting a lot of
Copy code
got Future <Future pending> attached to a different loop
but only on task failures, from the .map().result() operation. It looks like prefect manages the async loops and getting the future results in certain cases is problematic?
n
i'd be happy to look at an MRE!
s
working on it, thanks!
🙏 1
@Nate I guess if prefect creates and manages several asyncio loop internally, then my problem is a singleton created before the map operation, that uses the current loop to build itself and leveraged across tasks. about half tasks are failing with this error. Is there some hook to manage a per-loop object? My use case:
Copy code
aiodns.Resolver(loop=get_current_loop())

async get_resovler()->aiodns.Resolver:
  pass

@task
async def task(query:str):
  await get_resolver.query(query)
I'll try to move the singleton to a sub task instead of a plain function and cache the resolver, if subtasks are scheduled in the same aio loop then this should work