<@ULVA73B9P> , Is there an `as_completed` method o...
# ask-marvin
j
@Marvin , Is there an
as_completed
method or something in Prefect-Dask to be able to loop through PrefectFutures as they complete?
m
While the documentation excerpts don't directly mention an
as_completed
method, they do reference the use of
PrefectFuture
, which is an object that provides access to a computation happening in a task runner. Typically, when dealing with Dask, you might use the as_completed function to iterate over futures as they complete. This capability may not be directly exposed through Prefect's interface based on the documentation excerpts. However, with Prefect and Dask integration, you can submit tasks within a
DaskTaskRunner
flow, and Prefect handles the task dependencies automatically. In the code example provided in the snippet, tasks are submitted and Prefect manages their execution order according to dependencies, but it does not show a looping mechanism over completed futures. For more information on how Prefect integrates with Dask, please refer to the Prefect Dask documentation here. For general concepts around
PrefectFuture
and task runners, you can read more in the Prefect documentation here. If you need to process results as they complete, you may have to fetch the futures' results and handle them manually within your flow or use Dask's methods directly by interacting with the `DaskTaskRunner`'s underlying Dask client.
n
this should work independently of which
task_runner
you choose
Copy code
import asyncio

from prefect import flow, task


@task
async def sleep(duration: int) -> int:
    await asyncio.sleep(duration)
    return duration


@flow
async def as_completed_flow() -> None:
    for result in asyncio.as_completed(
        [future.result() for future in await sleep.map(range(5))]
    ):
        print(await result)


if __name__ == "__main__":
    asyncio.run(as_completed_flow())
πŸ‘€ 1
j
Hey Nate, sorry meant to ask sooner, is this example only work as asyncio I take it? 😞
n
hey @Jack P - as opposed to something sync instead?
j
I am using Dask Task Runner on sync api request . It’s like a streaming endpoint so I used dask instead of asyncio. I mainly wanted to be able to just take the results of the tasks I fire off as they finish, instead of just gathering futures and looping through result.
n
hmm - I may be missing some context.
sync api request
does it have to be sync? based on this
loop through PrefectFutures as they complete
I feel like I might want to make things async and use
as_completed
something like the above
j
ahh I see what you're saying, I'll see if I can make it async πŸ™ ty
n
sure! lmk if it doesnt work out
πŸ™ 1