Selora
02/13/2025, 3:32 AMmap
also seems to block the execution and wait for all futures to be completed.
@task()
def get_number(id: int):
sleep(random.uniform(2.0, 8.0))
number = random.randint(0, 1000)
print(f"{id}, fetching {number}")
return number
@task()
def print_hello(id: int, number: int):
print(f"hello {id}, you got {number}")
return True
@flow()
def master_flow(parallel_tasks: int):
numbers = get_number.map(range(parallel_tasks))
t = []
for id, num in zip(range(parallel_tasks), as_completed(numbers)):
t.append(print_hello.submit(id, num.result()))
# Imaginary task#3 that uses the same pattern to yield completed futures from the print_hello here.
# This wouldn't start until _all_ print_hello are submitted.
# Although some futures of print_hello might already be ready!
I guess splitting that part into async tasks that yields and also return futures could maybe work?Tim Galvin
02/13/2025, 7:32 AMwait_for
keyword argument in the submit and map.
for idx, num in enumerate(t):
task_3.submit(idx, wait_for=num)
If you do it this way you won't need to necessarily use num.result()
in your print_hello.submit
call.
If you want all to complete before calling task_3
task_3.submit(`hello there`, wait_for=t)
Selora
02/13/2025, 2:16 PMfrom time import sleep
from prefect import flow, get_run_logger, task
from prefect.futures import as_completed
import random
@task()
def get_number(id: int):
sleep(random.uniform(2.0, 8.0))
number = random.randint(0, 1000)
print(f"{id}, fetching {number}")
return number
@task()
def add_number(id: int, number: int):
sleep(random.uniform(1.0, 4.0))
r = number + id
print(f"{id}, adding {number} + {id} = {number}")
return number
@task()
def print_hello(id: int, number: int):
print(f"hello {id}, you got {number}")
return True
@flow()
def master_flow(parallel_tasks: int):
numbers = get_number.map(range(parallel_tasks))
t = []
for id, num in zip(range(parallel_tasks), as_completed(numbers)):
t.append(add_number.submit(id, num.result()))
t2 = []
for id, num in zip(range(parallel_tasks), as_completed(t)):
t2.append(print_hello.submit(id, num.result()))
return [r.result() for r in t2]
if __name__ == "__main__":
master_flow(5)
Selora
02/13/2025, 2:26 PMr = range(parallel_tasks)
return print_hello.map(r, add_number.map(r, get_number.map(r)))
This as well:
r = range(parallel_tasks)
t1 = get_number.map(r)
t2 = add_number.map(r, t1)
t3 = print_hello.map(r, t2)
return t3.result()
I'm probably not wrapping my head around it properly, I guess I want to keep the benefits of proper scheduling and manual control over how I dispatch the results to consumers, not treating them like a "future block" of mapped tasks.