Ahmed Ezzat
04/21/2022, 3:05 PMgather
in prefect 2.0. I'm aware that tasks return instance of PrefectFuture
what I want is a way to get finished futures as soon as it completes, keep in mind I'm talking about a list of futures not just oneKevin Kho
04/21/2022, 3:08 PM[x.wait() for x in list_of_futures]
Ahmed Ezzat
04/21/2022, 3:09 PMwait()
is blocking I'm looking for non-blocking wayKevin Kho
04/21/2022, 3:12 PMAnna Geller
04/21/2022, 3:18 PMimport asyncio
from prefect import task, flow, get_run_logger
@task
async def print_values(values):
logger = get_run_logger()
for value in values:
await asyncio.sleep(1)
<http://logger.info|logger.info>(value)
@flow(name="async_flow")
async def async_flow():
await print_values([1, 2]) # runs immediately
coros = [print_values("abcd"), print_values("6789")]
await asyncio.gather(*coros)
if __name__ == "__main__":
asyncio.run(async_flow())
Ahmed Ezzat
04/21/2022, 3:35 PMasyncio.gather
is also blocking, it awaits all futures to finish and then returns the result. to cut it short I'm trying to figure out a hacky way around .map
for Orion as it's really powerful and beneficial for us to haveAnna Geller
04/21/2022, 3:38 PMZanie
04/21/2022, 4:44 PMfrom prefect import flow, task
@task
def add(x, y):
return x + y
def as_completed(*futures):
futures = set(futures)
completed = set()
while len(completed) < len(futures):
for future in futures.difference(completed):
state = future.get_state()
if state.is_completed():
completed.add(future)
yield state
@flow
def my_flow():
futures = []
# Create 100 futures
for i in range(100):
futures.append(add(1, i))
for state in as_completed(*futures):
print(state)
my_flow()
Ahmed Ezzat
04/22/2022, 4:31 AMAnna Geller
04/22/2022, 10:48 AMMalthe Karbo
04/24/2022, 5:26 PMfrom typing import Generator, Iterable, TypeVar
from prefect.futures import PrefectFuture
from prefect.orion.schemas.states import State
from prefect.utilities.asyncio import Sync
T = TypeVar("T")
def as_completed(
futures: Iterable[PrefectFuture[T, Sync]]
) -> Generator[State[T], None, None]:
_futures = set(futures)
completed = set()
while len(completed) < len(_futures):
for future in _futures.difference(completed):
state = future.get_state()
if state.is_completed():
completed.add(future)
yield state
Anna Geller
04/24/2022, 5:30 PMMalthe Karbo
05/16/2022, 7:12 PMFrom typing import Generator, Iterable, TypeVar
from prefect import get_run_logger
from prefect.futures import PrefectFuture
from prefect.utilities.asyncio import Sync
from tenacity import retry
from tenacity.stop import stop_after_attempt
from tenacity.wait import wait_exponential
T = TypeVar("T")
@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=0, max=30))
def check_state(future: PrefectFuture[Any, Sync]) -> bool:
state = future.get_state()
return state.is_completed()
def as_completed(
futures: Iterable[PrefectFuture[T, Sync]]
) -> Generator[PrefectFuture[T, Sync], None, None]:
logger = get_run_logger()
_futures = set(futures)
completed = set()
while len(completed) < len(_futures):
for future in _futures.difference(completed):
time.sleep(0.1)
try:
if check_state(future):
completed.add(future)
yield future
except Exception as e:
logger.exception(
f"🤬 Caught exception trying to get state from prefect api: {e}"
)
raise e
Apologies for choice of emoji, we like emojis in our logs 🤗