Daniel Lyons
04/04/2024, 8:36 PMTask.map
and get results as they become available rather than in order, as with <http://asyncio.as|asyncio.as>_completed
? I'm wondering if I can implement a kind of map-reduce operation with the reductions happening as results become available rather than in order.Marvin
04/04/2024, 8:36 PMState.result()
method. However, there is no direct mention of obtaining results out of order similarly to asyncio.as_completed
. Prefect's map operation creates a task run for each element of its input data, and these tasks represent individual children tasks.
It seems that retrieving the results asynchronously as they become available, in the style of the asyncio.as_completed
function, is not explicitly documented in the provided Prefect documentation excerpts.
The typical method for working with task results within a flow in Prefect is as follows:
python
from prefect import flow, task
@task
async def my_task():
return 1
@flow
async def my_flow():
# Running the task and capturing its state
state = await my_task(return_state=True)
# Awaiting the result from the state
result = await state.result(fetch=True)
return result + 1
# First, we define an async main function
async def main():
state = await my_flow(return_state=True)
assert await state.result(fetch=True) == 2
return state
# Running the main coroutine to execute the flow
asyncio.run(main())
While this snippet Resolving results doesn't show using tasks out of order, you can possibly leverage Python's asynchronous concurrency features within the flow function to manage result retrieval in a non-blocking manner that simulates asyncio.as_completed
behavior.
If you would like to explore this further, you might have to use Python's native asyncio
library features alongside Prefect to implement your map-reduce operation where reductions can happen as results become available.
For more complex workflows or custom behaviors outside the standard use of Prefect's tasks and flows, it is advisable to look into the Prefect community or GitHub issues for further assistance.Daniel Lyons
04/04/2024, 8:38 PMDaniel Lyons
04/04/2024, 8:42 PMNate
04/04/2024, 9:05 PM.submit()
(or .map()
to do it in bulk) will return futures that you should be able to await like as_completed
https://github.com/PrefectHQ/prefect/issues/8990#issuecomment-1722195709Nate
04/04/2024, 9:05 PMDaniel Lyons
04/04/2024, 9:05 PMNate
04/04/2024, 9:18 PMimport asyncio
from prefect import flow, task
@task
async def sleep(duration: int) -> int:
await asyncio.sleep(duration)
return duration
@flow
async def as_completed_flow() -> None:
for result in asyncio.as_completed(
[future.result() for future in await sleep.map(range(5))]
):
print(await result)
if __name__ == "__main__":
asyncio.run(as_completed_flow())
Daniel Lyons
04/04/2024, 9:18 PMNate
04/04/2024, 9:19 PM