https://prefect.io logo
Title
a

Alex Beatson

04/12/2023, 7:24 PM
Prefect-Ray spins up workers before the inputs for the worker tasks are ready / before the task's dependencies are complete: Hi folks, we're using Prefect and Ray to orchestrate distributed workflows. When we generate a future
x = task_1.submit()
, if we pass that future as an input to another task,
y = task_2.submit(x=x)
, Ray will spin up a worker for
task_2
with the requested resources, and this worker will wait on
x.result()
. This is obviously undesirable! If we had
x1 = task1(); x2 = task2(x1=x1); x3=task3(x2=x2); ...
, all the later tasks in the pipeline would be wasting worker resources while waiting for the earlier workers to complete. We would like the worker for a task to only launch when the task inputs become ready. Is this known behavior / does anyone know how to solve this? Here's a working example:
from prefect import flow, task
from prefect_ray import RayTaskRunner
from prefect_ray.context import remote_options
import time


@task
def task_a():
    time.sleep(100)
    return 'a'


@task
def task_b():
    return 'b'


@flow(
    task_runner=RayTaskRunner(
        address="<ray://ray-cluster-kuberay-head-svc:10001>"
    )
)
def multi_test():
    with remote_options(num_cpus=1, num_gpus=1):
        x = task_a.submit()
    for i in range(10):
        with remote_options(num_cpus=1, num_gpus=1):
            task_b.submit(wait_for=[x])


if __name__ == '__main__':
    print(multi_test())
When run, we immediately get 11 workers created, per the image. CC @Muxin Fang @Haotian Li