Andrew Xue
08/20/2025, 6:34 PMMarvin
08/20/2025, 6:34 PMMarvin
08/20/2025, 6:36 PMfrom prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner
from prefect_ray.context import remote_options
# Connect to an existing Ray cluster; or omit address to start a local Ray
ray_runner = RayTaskRunner(address="ray://<ray-head-host>:10001")
@task
def quick_local(x):
return x + 1
@task
def heavy_cpu(x):
# imagine something computationally intense here
return sum(i*i for i in range(x))
@task
def heavy_gpu(x):
# placeholder for GPU work
return x * 2
@flow(task_runner=ray_runner)
def ray_subflow(n: int):
# Fine-tune Ray resources per task call
with remote_options(num_cpus=2):
a = heavy_cpu.submit(n)
with remote_options(num_gpus=1):
b = heavy_gpu.submit(n)
return a.result(), b.result()
@flow # default (local) runner – nothing goes to Ray here
def main():
local = quick_local(5)
ray_results = ray_subflow(1_000_000) # only this subflow runs on Ray
more_local = quick_local(local)
return ray_results, more_local
if __name__ == "__main__":
main()
If you only want a single task to run on Ray, use a tiny wrapper subflow:
@flow(task_runner=ray_runner)
def run_on_ray(x):
with remote_options(num_cpus=4):
return heavy_cpu.submit(x).result()
@flow
def main():
local = quick_local(10)
ray_value = run_on_ray(2_000_000)
return local, ray_value
Notes:
- prefect_ray.context.remote_options(...)
lets you set Ray remote
options per submitted task (e.g., num_cpus
, num_gpus
, resources
, scheduling_strategy
).
- Install extras with prefect[ray]
.
- When using an external cluster, ensure your flow code can reach the Ray head node at the ray://
address.
Docs: Prefect Ray integration
If you share how you’re deploying (local, Docker, K8s) and whether you have an existing Ray cluster, I can tailor the snippet to your setup. I know, thrilling.Andrew Xue
08/20/2025, 6:42 PMfrom functools import wraps
from prefect import flow, task
from prefect_ray import RayTaskRunner
def ray_task(address: str = "<ray://127.0.0.1:10001>", **task_kwargs):
"""
Decorator that wraps a function in a Prefect flow using RayTaskRunner,
then wraps that flow in a Prefect task so you can call it normally or map it.
"""
def decorator(fn):
# Flow that runs this function on Ray
@flow(task_runner=RayTaskRunner(address=address))
@wraps(fn)
def ray_flow(*args, **kwargs):
return fn(*args, **kwargs)
# Prefect task wrapper around the flow
@task(**task_kwargs)
@wraps(fn)
def wrapped_task(*args, **kwargs):
return ray_flow(*args, **kwargs)
return wrapped_task
return decorator
# ---------------- Example usage ---------------- #
@ray_task(name="ray_add_task")
def add(x, y):
return x + y
@flow
def main():
nums1 = [1, 2, 3, 4]
nums2 = [10, 20, 30, 40]
# parallel map
results = add.map(nums1, nums2)
# direct call
direct = add(5, 7)
return results, direct
if __name__ == "__main__":
results, direct = main()
print("Mapped results:", [r.result() for r in results])
print("Direct result:", direct.result())
Why am I getting RuntimeError: Ray Client is already connected. Maybe you called ray.init("ray://<address>") twice by accident?
Marvin
08/20/2025, 6:42 PMMarvin
08/20/2025, 6:44 PMfrom prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner
from prefect_ray.context import remote_options
@task
def add(x, y):
return x + y
# Only this subflow talks to Ray
@flow(task_runner=RayTaskRunner(address="<ray://127.0.0.1:10001>"))
def ray_add_many(xs, ys):
# Optional: tune Ray resources per submitted call
with remote_options(num_cpus=1):
futures = [add.submit(x, y) for x, y in zip(xs, ys)]
return [f.result() for f in futures]
@flow
def main():
nums1 = [1, 2, 3, 4]
nums2 = [10, 20, 30, 40]
# Parallel on Ray
results = ray_add_many(nums1, nums2)
# Local task call
direct = add(5, 7)
return results, direct
If you really want to keep your decorator
- You can try passing init_kwargs={"ignore_reinit_error": True}
to RayTaskRunner. That suppresses the re-init exception:
RayTaskRunner(address="<ray://127.0.0.1:10001>", init_kwargs={"ignore_reinit_error": True})
But beware: multiple overlapping subflows will each enter/exit their own Ray contexts, which can race with connection shutdown. It’s fragile; I wouldn’t ship it.
Two more nits:
- Don’t include angle brackets in the address. Use <ray://127.0.0.1:10001>
, not "<ray://127.0.0.1:10001>"
.
- Prefer from prefect_ray.task_runners import RayTaskRunner
and from prefect_ray.context import remote_options
.
Docs: Prefect Ray integration
If you share whether you’re on an external Ray cluster or local, and how you deploy, I can tailor the pattern to your setup. I’ll try to contain my excitement.Andrew Xue
08/20/2025, 6:51 PMMarvin
08/20/2025, 6:51 PMMarvin
08/20/2025, 6:52 PMimport os
import threading
import ray
from prefect import flow, task
_RAY_LOCK = threading.Lock()
def ensure_ray():
if not ray.is_initialized():
with _RAY_LOCK:
if not ray.is_initialized():
addr = os.getenv("RAY_ADDRESS", "<ray://127.0.0.1:10001>")
ray.init(address=addr, ignore_reinit_error=True)
@ray.remote
def add_remote(x, y):
return x + y
@task
def add_via_ray(x, y):
ensure_ray()
return ray.get(add_remote.remote(x, y))
@flow
def main():
xs, ys = [1,2,3,4], [10,20,30,40]
results = add_via_ray.map(xs, ys) # mapped Prefect tasks; each calls Ray
return [r.result() for r in results]
if __name__ == "__main__":
print(main())
When to prefer RayTaskRunner
- If you want Prefect to manage concurrency, retries, and visibility across many tasks running on Ray, wrap that part of your workflow in a subflow with RayTaskRunner and use prefect_ray.context.remote_options
for per-call resources. It’s cleaner and less brittle.
Docs: Prefect + Ray integration.