Hey there, optimization question. If I run a bunch...
# ask-community
s
Hey there, optimization question. If I run a bunch of tasks in bulk where each tasks are dependant on the previous result, but the result might arrive later, is there a built-in mechanism to schedule the tasks to run on the next stage as soon as they're ready? Consider the following, that works, up until you add several different tasks waiting on each-other. The "as_completed" iteration blocks the sync execution for the rest.
map
also seems to block the execution and wait for all futures to be completed.
Copy code
@task()
def get_number(id: int):
    sleep(random.uniform(2.0, 8.0))
    number = random.randint(0, 1000)
    print(f"{id}, fetching {number}")
    return number


@task()
def print_hello(id: int, number: int):
    print(f"hello {id}, you got {number}")
    return True


@flow()
def master_flow(parallel_tasks: int):

    numbers = get_number.map(range(parallel_tasks))
    t = []
    for id, num in zip(range(parallel_tasks), as_completed(numbers)):
        t.append(print_hello.submit(id, num.result()))

    # Imaginary task#3 that uses the same pattern to yield completed futures from the print_hello here. 
    # This wouldn't start until _all_ print_hello are submitted.
    # Although some futures of print_hello might already be ready!
I guess splitting that part into async tasks that yields and also return futures could maybe work?
t
I may not be following completely, but there is the
wait_for
keyword argument in the submit and map.
Copy code
for idx, num in enumerate(t):
    task_3.submit(idx, wait_for=num)
If you do it this way you won't need to necessarily use
num.result()
in your
print_hello.submit
call. If you want all to complete before calling
task_3
Copy code
task_3.submit(`hello there`, wait_for=t)
s
I want kinda the other way around; See this picture of the flow with an actual third task in the pipeline. The flow executes 5x of each of these: get_number->add_number->print_hello "print_hello" only starts after all "add_numbers", the second task in line, have been submitted. "print_hello" could've started much earlier: Code:
Copy code
from time import sleep
from prefect import flow, get_run_logger, task
from prefect.futures import as_completed
import random


@task()
def get_number(id: int):
    sleep(random.uniform(2.0, 8.0))
    number = random.randint(0, 1000)
    print(f"{id}, fetching {number}")
    return number


@task()
def add_number(id: int, number: int):
    sleep(random.uniform(1.0, 4.0))
    r = number + id
    print(f"{id}, adding {number} + {id} = {number}")
    return number


@task()
def print_hello(id: int, number: int):
    print(f"hello {id}, you got {number}")
    return True


@flow()
def master_flow(parallel_tasks: int):
    numbers = get_number.map(range(parallel_tasks))
    t = []
    for id, num in zip(range(parallel_tasks), as_completed(numbers)):
        t.append(add_number.submit(id, num.result()))

    t2 = []
    for id, num in zip(range(parallel_tasks), as_completed(t)):
        t2.append(print_hello.submit(id, num.result()))

    return [r.result() for r in t2]


if __name__ == "__main__":
    master_flow(5)
Doing this works as intended:
Copy code
r = range(parallel_tasks)
return print_hello.map(r, add_number.map(r, get_number.map(r)))
This as well:
Copy code
r = range(parallel_tasks)
    t1 = get_number.map(r)

    t2 = add_number.map(r, t1)
    t3 = print_hello.map(r, t2)
    return t3.result()
I'm probably not wrapping my head around it properly, I guess I want to keep the benefits of proper scheduling and manual control over how I dispatch the results to consumers, not treating them like a "future block" of mapped tasks.