Alex Beatson
04/12/2023, 7:24 PMx = task_1.submit()
, if we pass that future as an input to another task, y = task_2.submit(x=x)
, Ray will spin up a worker for task_2
with the requested resources, and this worker will wait on x.result()
.
This is obviously undesirable! If we had x1 = task1(); x2 = task2(x1=x1); x3=task3(x2=x2); ...
, all the later tasks in the pipeline would be wasting worker resources while waiting for the earlier workers to complete. We would like the worker for a task to only launch when the task inputs become ready.
Is this known behavior / does anyone know how to solve this?
Here's a working example:
from prefect import flow, task
from prefect_ray import RayTaskRunner
from prefect_ray.context import remote_options
import time
@task
def task_a():
time.sleep(100)
return 'a'
@task
def task_b():
return 'b'
@flow(
task_runner=RayTaskRunner(
address="<ray://ray-cluster-kuberay-head-svc:10001>"
)
)
def multi_test():
with remote_options(num_cpus=1, num_gpus=1):
x = task_a.submit()
for i in range(10):
with remote_options(num_cpus=1, num_gpus=1):
task_b.submit(wait_for=[x])
if __name__ == '__main__':
print(multi_test())
When run, we immediately get 11 workers created, per the image.
CC @Muxin Fang @Haotian Li