Alan
11/29/2024, 7:56 PMfrom prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner
@task
def parallel_task(task_id: int):
return f"Result from task {task_id}"
@task
def join_task(results):
combined_result = " | ".join(results)
return f"Joined result: {combined_result}"
@flow(task_runner=RayTaskRunner(address="<ray://raycluster-kuberay-head-svc.kuberay.svc.cluster.local:10001>"))
def fork_and_join_workflow():
# Fork: Execute 50 parallel tasks
parallel_results = [parallel_task.submit(i) for i in range(50)]
# Join: Aggregate results from all tasks
results = [result.result() for result in parallel_results]
joined_result = join_task(results)
print(joined_result)
if __name__ == "__main__":
fork_and_join_workflow()
Now I want to allocate 1 GPU and 4 CPU for each task that is submitted over to ray, how can I make sure of that?! In regular ray, I can just decorate a function with @ray.remote(num_gpus=1, num_cpus=4), but I cannot do that with prefect tasks?