What is the equivalent of `gather` in prefect 2.0....
# prefect-community
What is the equivalent of
in prefect 2.0. I'm aware that tasks return instance of
what I want is a way to get finished futures as soon as it completes, keep in mind I'm talking about a list of futures not just one
Maybe this, but it’s probably harder to use than just doing a list comprehension
[x.wait() for x in list_of_futures]
@Kevin Kho using
is blocking I'm looking for non-blocking way
meaning that if A finished and B still running this would block execution until B is finished. what I'm trying to get is some sort of a stream if A finished I get A regardless the state of B
Ok will need to ask someone and get back to you.
you could call your async tasks and do `asyncio.gather()`:
import asyncio
from prefect import task, flow, get_run_logger

async def print_values(values):
    logger = get_run_logger()
    for value in values:
        await asyncio.sleep(1) 

async def async_flow():
    await print_values([1, 2])  # runs immediately
    coros = [print_values("abcd"), print_values("6789")]
    await asyncio.gather(*coros)

if __name__ == "__main__":
is also blocking, it awaits all futures to finish and then returns the result. to cut it short I'm trying to figure out a hacky way around
for Orion as it's really powerful and beneficial for us to have
Thanks for explaining. Mapping is on the roadmap, for now you can do the same with a for loop
For what it’s worth, using gather as in that example is only awaiting submission concurrently, not completion.
We don’t have an as completed feature, but that’s a great feature request. I’ll write a quick blurb as a workaround
from prefect import flow, task

def add(x, y):
    return x + y

def as_completed(*futures):
    futures = set(futures)
    completed = set()
    while len(completed) < len(futures):
        for future in futures.difference(completed):
            state = future.get_state()
            if state.is_completed():
                yield state

def my_flow():
    futures = []
    # Create 100 futures
    for i in range(100):
        futures.append(add(1, i))

    for state in as_completed(*futures):

@Anna Geller could you please archive this thread on GitHub?
Thanks @Zanie it works
great to hear, I did but on Discourse here
Thanks for the above - I added generic type hinting to your implementation, incase you want to re-us
from typing import Generator, Iterable, TypeVar
from prefect.futures import PrefectFuture
from prefect.orion.schemas.states import State
from prefect.utilities.asyncio import Sync

T = TypeVar("T")

def as_completed(
    futures: Iterable[PrefectFuture[T, Sync]]
) -> Generator[State[T], None, None]:
    _futures = set(futures)
    completed = set()
    while len(completed) < len(_futures):
        for future in _futures.difference(completed):
            state = future.get_state()
            if state.is_completed():
                yield state
thanks, I added on Discourse. If you join Discourse, I'll tag you 🙂
Hi, incase anyone else is using this: Consider adding a retry mechanism and/or internal throttling mechanism (because this will absolutely spam either your prefect server or the prefect cloud, resulting in a 403 most likely). Here is an implementation we used
From typing import Generator, Iterable, TypeVar
from prefect import get_run_logger
from prefect.futures import PrefectFuture
from prefect.utilities.asyncio import Sync

from tenacity import retry
from tenacity.stop import stop_after_attempt
from tenacity.wait import wait_exponential

T = TypeVar("T")

@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=0, max=30))
def check_state(future: PrefectFuture[Any, Sync]) -> bool:
    state = future.get_state()
    return state.is_completed()

def as_completed(
    futures: Iterable[PrefectFuture[T, Sync]]
) -> Generator[PrefectFuture[T, Sync], None, None]:
    logger = get_run_logger()
    _futures = set(futures)
    completed = set()
    while len(completed) < len(_futures):
        for future in _futures.difference(completed):
                if check_state(future):
                    yield future
            except Exception as e:
                    f"🤬 Caught exception trying to get state from prefect api: {e}"
                raise e
Apologies for choice of emoji, we like emojis in our logs 🤗