Geoff Irons
03/24/2025, 10:45 PMmap
call over an aync task.
ie:
@task()
async def my_task(i):
return i * 2
@flow
def my_flow():
inputs = [1,2,3,4]
outputs = my_task.map(inputs) # how do I await this?
Marvin
03/24/2025, 10:46 PMmap
call over an asynchronous task in Prefect 3.x, you can use the result()
method on the list of futures returned by the map
call. This approach is similar to handling futures in concurrent programming.
Here's how you can modify your flow to await the results:
python
from prefect import flow, task
@task
async def my_task(i):
return i * 2
@flow
async def my_flow():
inputs = [1, 2, 3, 4]
futures = my_task.map(inputs)
outputs = await futures.result() # Await the results
return outputs
You can read more about task runners and mapping in Prefect 3.x in the documentation.Geoff Irons
03/24/2025, 10:48 PMTypeError: object list can't be used in 'await' expression
The return type of result() is list[CoroutineType[Any, Any, None]]
which cannot be awaitedMarvin
03/24/2025, 10:49 PMasyncio.gather
. Here's how you can modify your flow to correctly await the results:
python
import asyncio
from prefect import flow, task
@task
async def my_task(i):
return i * 2
@flow
async def my_flow():
inputs = [1, 2, 3, 4]
futures = my_task.map(inputs)
# Gather the results from the list of coroutines
outputs = await asyncio.gather(*[future.aresult() for future in futures])
return outputs
The aresult()
method is used to asynchronously retrieve the result from each PrefectFuture
. You can read more about handling futures in Prefect's source code.Geoff Irons
03/24/2025, 10:53 PMawait asyncio.gather(*[f.result() for f in my_task.map(inputs)])
Satisfies the type checker, but gives the result:
TypeError: An asyncio.Future, a coroutine or an awaitable is required
There is no aresult() function as far as I can findMarvin
03/24/2025, 10:53 PMresult()
method on the PrefectFuture
. The result()
method is indeed blocking and should not be awaited directly within an async
context. Instead, you should ensure that you are correctly handling the Prefect futures.
Here is a revised approach to handle the results from a map
call over an async task:
python
from prefect import flow, task
import asyncio
@task
async def my_task(i):
return i * 2
@flow
async def my_flow():
inputs = [1, 2, 3, 4]
futures = my_task.map(inputs)
# Gather results using asyncio.gather
outputs = await asyncio.gather(*[future.result() for future in futures])
return outputs
If this still does not work, you may need to manually resolve each future in a loop, ensuring that each task has completed before attempting to retrieve results:
python
from prefect import flow, task
import asyncio
@task
async def my_task(i):
return i * 2
@flow
async def my_flow():
inputs = [1, 2, 3, 4]
futures = my_task.map(inputs)
outputs = []
for future in futures:
await future.wait() # Ensure the future is complete
outputs.append(future.result())
return outputs
This method uses await future.wait()
to ensure that each future is completed before calling result()
. If you continue to encounter issues, it might be worth verifying the Prefect version and checking for any specific changes in how async tasks are handled in Prefect's latest documentation.