<@ULVA73B9P> is it possible to take the list of fu...
# marvin-in-the-wild
d
@Marvin is it possible to take the list of futures returned by
Task.map
and get results as they become available rather than in order, as with
<http://asyncio.as|asyncio.as>_completed
? I'm wondering if I can implement a kind of map-reduce operation with the reductions happening as results become available rather than in order.
m
According to the Prefect documentation, when working with flow and task states, the result can be retrieved with the
State.result()
method. However, there is no direct mention of obtaining results out of order similarly to
asyncio.as_completed
. Prefect's map operation creates a task run for each element of its input data, and these tasks represent individual children tasks. It seems that retrieving the results asynchronously as they become available, in the style of the
asyncio.as_completed
function, is not explicitly documented in the provided Prefect documentation excerpts. The typical method for working with task results within a flow in Prefect is as follows:
Copy code
python
from prefect import flow, task

@task
async def my_task():
    return 1

@flow
async def my_flow():
    # Running the task and capturing its state
    state = await my_task(return_state=True)
    # Awaiting the result from the state
    result = await state.result(fetch=True)
    return result + 1

# First, we define an async main function
async def main():
    state = await my_flow(return_state=True)
    assert await state.result(fetch=True) == 2
    return state

# Running the main coroutine to execute the flow
asyncio.run(main())
While this snippet Resolving results doesn't show using tasks out of order, you can possibly leverage Python's asynchronous concurrency features within the flow function to manage result retrieval in a non-blocking manner that simulates
asyncio.as_completed
behavior. If you would like to explore this further, you might have to use Python's native
asyncio
library features alongside Prefect to implement your map-reduce operation where reductions can happen as results become available. For more complex workflows or custom behaviors outside the standard use of Prefect's tasks and flows, it is advisable to look into the Prefect community or GitHub issues for further assistance.
d
I found [this issue](https://github.com/PrefectHQ/prefect/issues/8990) which seems related but concludes with something along the lines of how it is now possible to implement it oneself, but not any kind of sample code or link to new APIs that might indicate how.
n
one thing marvin's missing that is that
.submit()
(or
.map()
to do it in bulk) will return futures that you should be able to await like as_completed https://github.com/PrefectHQ/prefect/issues/8990#issuecomment-1722195709
lets see if I can make an example
d
thank you @Nate!
n
Copy code
import asyncio

from prefect import flow, task


@task
async def sleep(duration: int) -> int:
    await asyncio.sleep(duration)
    return duration


@flow
async def as_completed_flow() -> None:
    for result in asyncio.as_completed(
        [future.result() for future in await sleep.map(range(5))]
    ):
        print(await result)


if __name__ == "__main__":
    asyncio.run(as_completed_flow())
d
this is great, thank you so much for the help!
n
catjam