https://prefect.io logo
#prefect-community
Title
# prefect-community
a

Ahmed Ezzat

04/21/2022, 3:05 PM
What is the equivalent of
gather
in prefect 2.0. I'm aware that tasks return instance of
PrefectFuture
what I want is a way to get finished futures as soon as it completes, keep in mind I'm talking about a list of futures not just one
discourse 1
1
k

Kevin Kho

04/21/2022, 3:08 PM
Maybe this, but it’s probably harder to use than just doing a list comprehension
Copy code
[x.wait() for x in list_of_futures]
a

Ahmed Ezzat

04/21/2022, 3:09 PM
@Kevin Kho using
wait()
is blocking I'm looking for non-blocking way
meaning that if A finished and B still running this would block execution until B is finished. what I'm trying to get is some sort of a stream if A finished I get A regardless the state of B
k

Kevin Kho

04/21/2022, 3:12 PM
Ok will need to ask someone and get back to you.
a

Anna Geller

04/21/2022, 3:18 PM
you could call your async tasks and do `asyncio.gather()`:
Copy code
import asyncio
from prefect import task, flow, get_run_logger


@task
async def print_values(values):
    logger = get_run_logger()
    for value in values:
        await asyncio.sleep(1) 
        <http://logger.info|logger.info>(value)


@flow(name="async_flow")
async def async_flow():
    await print_values([1, 2])  # runs immediately
    coros = [print_values("abcd"), print_values("6789")]
    await asyncio.gather(*coros)


if __name__ == "__main__":
    asyncio.run(async_flow())
a

Ahmed Ezzat

04/21/2022, 3:35 PM
asyncio.gather
is also blocking, it awaits all futures to finish and then returns the result. to cut it short I'm trying to figure out a hacky way around
.map
for Orion as it's really powerful and beneficial for us to have
a

Anna Geller

04/21/2022, 3:38 PM
Thanks for explaining. Mapping is on the roadmap, for now you can do the same with a for loop
z

Zanie

04/21/2022, 4:44 PM
For what it’s worth, using gather as in that example is only awaiting submission concurrently, not completion.
We don’t have an as completed feature, but that’s a great feature request. I’ll write a quick blurb as a workaround
🙏 1
Copy code
from prefect import flow, task


@task
def add(x, y):
    return x + y


def as_completed(*futures):
    futures = set(futures)
    completed = set()
    while len(completed) < len(futures):
        for future in futures.difference(completed):
            state = future.get_state()
            if state.is_completed():
                completed.add(future)
                yield state


@flow
def my_flow():
    futures = []
    # Create 100 futures
    for i in range(100):
        futures.append(add(1, i))

    for state in as_completed(*futures):
        print(state)


my_flow()
a

Ahmed Ezzat

04/22/2022, 4:31 AM
@Anna Geller could you please archive this thread on GitHub?
Thanks @Zanie it works
a

Anna Geller

04/22/2022, 10:48 AM
great to hear, I did but on Discourse here
m

Malthe Karbo

04/24/2022, 5:26 PM
Thanks for the above - I added generic type hinting to your implementation, incase you want to re-us
Copy code
from typing import Generator, Iterable, TypeVar
from prefect.futures import PrefectFuture
from prefect.orion.schemas.states import State
from prefect.utilities.asyncio import Sync


T = TypeVar("T")


def as_completed(
    futures: Iterable[PrefectFuture[T, Sync]]
) -> Generator[State[T], None, None]:
    _futures = set(futures)
    completed = set()
    while len(completed) < len(_futures):
        for future in _futures.difference(completed):
            state = future.get_state()
            if state.is_completed():
                completed.add(future)
                yield state
a

Anna Geller

04/24/2022, 5:30 PM
thanks, I added on Discourse. If you join Discourse, I'll tag you 🙂
m

Malthe Karbo

05/16/2022, 7:12 PM
Hi, incase anyone else is using this: Consider adding a retry mechanism and/or internal throttling mechanism (because this will absolutely spam either your prefect server or the prefect cloud, resulting in a 403 most likely). Here is an implementation we used
Copy code
From typing import Generator, Iterable, TypeVar
from prefect import get_run_logger
from prefect.futures import PrefectFuture
from prefect.utilities.asyncio import Sync

from tenacity import retry
from tenacity.stop import stop_after_attempt
from tenacity.wait import wait_exponential


T = TypeVar("T")


@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=0, max=30))
def check_state(future: PrefectFuture[Any, Sync]) -> bool:
    state = future.get_state()
    return state.is_completed()


def as_completed(
    futures: Iterable[PrefectFuture[T, Sync]]
) -> Generator[PrefectFuture[T, Sync], None, None]:
    logger = get_run_logger()
    _futures = set(futures)
    completed = set()
    while len(completed) < len(_futures):
        for future in _futures.difference(completed):
            time.sleep(0.1)
            try:
                if check_state(future):
                    completed.add(future)
                    yield future
            except Exception as e:
                logger.exception(
                    f"🤬 Caught exception trying to get state from prefect api: {e}"
                )
                raise e
Apologies for choice of emoji, we like emojis in our logs 🤗
10 Views