Christian Vogel
09/14/2022, 11:48 AM@task
def show(x):
print(x)
# Create a `LocalCluster` with some resource annotations
# Annotations are abstract in dask and not inferred from your system.
# Here, we claim that our system has 1 GPU and 1 process available per worker
@flow(
task_runner=DaskTaskRunner(
cluster_kwargs={"n_workers": 1, "resources": {"GPU": 1, "process": 1}}
)
)
def my_flow():
with dask.annotate(resources={'GPU': 1}):
future = show(0) # this task requires 1 GPU resource on a worker
with dask.annotate(resources={'process': 1}):
# These tasks each require 1 process on a worker; because we've
# specified that our cluster has 1 process per worker and 1 worker,
# these tasks will run sequentially
future = show(1)
future = show(2)
future = show(3)
Source: https://discourse.prefect.io/t/how-to-handle-custom-resource-cpu-vs-gpu-and-code-dependencies-across-various-tasks-and-flows/503/2Ryan Peden
09/14/2022, 12:25 PMinit
function. You can see a list of all the args ray.init can accept on this page in Ray's docs.
So, in your case, I think you'd want something like:
@flow(task_runner=RayTaskRunner(address="<ray://my_cluster_ip:8786>", { "num_cpus": 2, "num_gpus": 1 }))
Christian Vogel
09/14/2022, 12:30 PMValueError: When connecting to an existing cluster, num_cpus and num_gpus must not be provided.
from prefect import task, flow
from prefect_ray.task_runners import RayTaskRunner
import time
@flow(name="time-wait-flow", task_runner=RayTaskRunner(address="auto", init_kwargs={"num_cpus": 2}))
def time_wait_flow():
factor = 50
times = [5.0, 16.3, 25.5]
for i in range(0, factor):
for time_in_secs in times:
wait_time.submit(time_in_secs)
@task
def wait_time(time_in_secs):
time.sleep(time_in_secs)
if __name__ == "__main__":
time_wait_flow()
Ryan Peden
09/14/2022, 12:57 PMChristian Vogel
09/15/2022, 7:21 AM