https://prefect.io logo
Title
c

Christian Vogel

09/14/2022, 11:48 AM
Hi Prefect Community! Do you know if it is already possible to specify cpu/gpu resource requirements per task for the RayTaskRunner? So far I only managed to configure the Ray Cluster, but I am not sure if its possible to map the tasks to the different worker types to then autoscale accordingly (for example only scaling out gpu workers if needed).
1
Basically I was wondering if an RayTaskRunner-equivalent to this is possible:
@task
def show(x):
    print(x)

# Create a `LocalCluster` with some resource annotations
# Annotations are abstract in dask and not inferred from your system.
# Here, we claim that our system has 1 GPU and 1 process available per worker
@flow(
    task_runner=DaskTaskRunner(
        cluster_kwargs={"n_workers": 1, "resources": {"GPU": 1, "process": 1}}
    )
)
def my_flow():
    with dask.annotate(resources={'GPU': 1}):
        future = show(0)  # this task requires 1 GPU resource on a worker

    with dask.annotate(resources={'process': 1}):
        # These tasks each require 1 process on a worker; because we've 
        # specified that our cluster has 1 process per worker and 1 worker,
        # these tasks will run sequentially
        future = show(1)
        future = show(2)
        future = show(3)
Source: https://discourse.prefect.io/t/how-to-handle-custom-resource-cpu-vs-gpu-and-code-dependencies-across-various-tasks-and-flows/503/2
r

Ryan Peden

09/14/2022, 12:25 PM
Hi Christian, looking at the RayTaskRunner code on GitHub, the constructor takes two arguments: the address of a Ray cluster, and a dict of arguments it will pass to Ray when it calls Ray's
init
function. You can see a list of all the args ray.init can accept on this page in Ray's docs. So, in your case, I think you'd want something like:
@flow(task_runner=RayTaskRunner(address="<ray://my_cluster_ip:8786>", { "num_cpus": 2, "num_gpus": 1 }))
c

Christian Vogel

09/14/2022, 12:30 PM
Hi Ryan, thanks for your help. I thought that this config is rather for configuring the cluster itself but now i ll give it a try!
If it works like this, then it would enable a resource separation on a flow level (rather than on a task level), right?
I quickly tried it out and it seems that because we already have a running ray cluster we can not specify the resources:
ValueError: When connecting to an existing cluster, num_cpus and num_gpus must not be provided.
Here is the code:
from prefect import task, flow
from prefect_ray.task_runners import RayTaskRunner
import time


@flow(name="time-wait-flow", task_runner=RayTaskRunner(address="auto",  init_kwargs={"num_cpus": 2}))
def time_wait_flow():
    factor = 50
    times = [5.0, 16.3, 25.5]
    for i in range(0, factor):
        for time_in_secs in times:
            wait_time.submit(time_in_secs)

@task
def wait_time(time_in_secs):
    time.sleep(time_in_secs)


if __name__ == "__main__":
    time_wait_flow()
Do you see anything wrong? Or might there be another way to specify the resource requirements?
r

Ryan Peden

09/14/2022, 12:57 PM
My apologies - I'm doing a quick test to try something else that might work here.
Hi Christian - I don't see a way to make this happen on a per-task basis right now. I'll do a bit more digging to see if I can find anything useful, but if this use case is important for you, it might be worth mentioning it in the prefect-ray repository on GitHub
c

Christian Vogel

09/15/2022, 7:21 AM
Thanks for your assistance Ryan!
@Ryan Peden But were you able to make it run on a per-flow basis?