<@ULVA73B9P> How do I retrieve the results from a ...
# ask-marvin
g
@Marvin How do I retrieve the results from a
map
call over an aync task. ie:
Copy code
@task()
async def my_task(i):
    return i * 2

@flow
def my_flow():
    inputs = [1,2,3,4]
    outputs = my_task.map(inputs)  # how do I await this?
m
To retrieve results from a
map
call over an asynchronous task in Prefect 3.x, you can use the
result()
method on the list of futures returned by the
map
call. This approach is similar to handling futures in concurrent programming. Here's how you can modify your flow to await the results:
Copy code
python
from prefect import flow, task

@task
async def my_task(i):
    return i * 2

@flow
async def my_flow():
    inputs = [1, 2, 3, 4]
    futures = my_task.map(inputs)
    outputs = await futures.result()  # Await the results
    return outputs
You can read more about task runners and mapping in Prefect 3.x in the documentation.
g
@Marvin It would be nice if that worked! Doing so gives me the error:
Copy code
TypeError: object list can't be used in 'await' expression
The return type of result() is
list[CoroutineType[Any, Any, None]]
which cannot be awaited
m
It seems I made a mistake in my previous response. To handle a list of coroutines in Prefect, you should first gather them using
asyncio.gather
. Here's how you can modify your flow to correctly await the results:
Copy code
python
import asyncio
from prefect import flow, task

@task
async def my_task(i):
    return i * 2

@flow
async def my_flow():
    inputs = [1, 2, 3, 4]
    futures = my_task.map(inputs)
    # Gather the results from the list of coroutines
    outputs = await asyncio.gather(*[future.aresult() for future in futures])
    return outputs
The
aresult()
method is used to asynchronously retrieve the result from each
PrefectFuture
. You can read more about handling futures in Prefect's source code.
g
@Marvin It would be nice if that worked! Doing:
Copy code
await asyncio.gather(*[f.result() for f in my_task.map(inputs)])
Satisfies the type checker, but gives the result:
Copy code
TypeError: An asyncio.Future, a coroutine or an awaitable is required
There is no aresult() function as far as I can find
m
It seems there was a misunderstanding regarding the use of the
result()
method on the
PrefectFuture
. The
result()
method is indeed blocking and should not be awaited directly within an
async
context. Instead, you should ensure that you are correctly handling the Prefect futures. Here is a revised approach to handle the results from a
map
call over an async task:
Copy code
python
from prefect import flow, task
import asyncio

@task
async def my_task(i):
    return i * 2

@flow
async def my_flow():
    inputs = [1, 2, 3, 4]
    futures = my_task.map(inputs)
    # Gather results using asyncio.gather
    outputs = await asyncio.gather(*[future.result() for future in futures])
    return outputs
If this still does not work, you may need to manually resolve each future in a loop, ensuring that each task has completed before attempting to retrieve results:
Copy code
python
from prefect import flow, task
import asyncio

@task
async def my_task(i):
    return i * 2

@flow
async def my_flow():
    inputs = [1, 2, 3, 4]
    futures = my_task.map(inputs)
    outputs = []
    for future in futures:
        await future.wait()  # Ensure the future is complete
        outputs.append(future.result())
    return outputs
This method uses
await future.wait()
to ensure that each future is completed before calling
result()
. If you continue to encounter issues, it might be worth verifying the Prefect version and checking for any specific changes in how async tasks are handled in Prefect's latest documentation.