Saad80
08/08/2025, 11:37 PMMarvin
08/08/2025, 11:37 PMexplore_module_offerings
📊 Tools used:
• check_cli_command
(13x)
• display_callable_signature
(14x)
• explore_module_offerings
(22x)
• research_prefect_topic
(8x)
• review_common_3x_gotchas
(12x)
• search_prefect_3x_docs
(28x)Saad80
08/08/2025, 11:41 PMMarvin
08/08/2025, 11:41 PMexplore_module_offerings
📊 Tools used:
• explore_module_offerings
(5x)
• research_prefect_topic
(3x)
• review_common_3x_gotchas
(3x)
• search_prefect_3x_docs
(3x)Marvin
08/08/2025, 11:42 PMSaad80
08/08/2025, 11:42 PMMarvin
08/08/2025, 11:42 PMMarvin
08/08/2025, 11:50 PMpip install prefect-ray ray
- Use the task runner in your flow
from prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner
@task
def inc(x):
return x + 1
@flow(task_runner=RayTaskRunner(address="auto", init_kwargs={"namespace": "prefect"}))
def my_flow():
futures = [inc.submit(i) for i in range(10)]
return sum(f.result() for f in futures)
if __name__ == "__main__":
my_flow()
Notes:
- address="auto"
connects to an existing Ray cluster if available; omit it to start a local Ray runtime.
- You can pass more Ray init options via init_kwargs={...}
.
2) Create a work pool of the worker type you want
- For example, a Docker or Kubernetes pool (requires the relevant integration to be installed) or a simple Process pool:
prefect work-pool create "my-process-pool" --type process
# or
prefect work-pool create "my-docker-pool" --type docker
# or
prefect work-pool create "my-k8s-pool" --type kubernetes
Then start a worker for that pool:
prefect worker start -p my-process-pool
# or
prefect worker start -p my-docker-pool
# or
prefect worker start -p my-k8s-pool
The worker type is whatever the pool’s type is. That’s the “specific worker type.”
3) Target that pool when you deploy the flow
- In a Prefect 3 project, your prefect.yaml can specify the pool and its job variables. Example for Docker:
deployments:
- name: ray-on-docker
entrypoint: flows/my_flow.py:my_flow
work_pool:
name: my-docker-pool
job_variables:
image: my-org/prefect-ray:latest
env:
RAY_ADDRESS: <ray://ray-head:10001>
Then deploy:
prefect deploy
Key points to keep your sanity intact:
- Worker type ≠task runner. Worker type is infrastructure (process, docker, kubernetes, …). Task runner is in-process concurrency (Ray, Dask, threads).
- Make sure the worker’s runtime environment has everything your flow needs: Prefect, your code, and Ray (plus cluster connectivity if you’re using a remote Ray cluster).
- If you don’t see your desired worker type, install its integration (e.g., prefect-docker, prefect-kubernetes) before creating the work pool.
If you tell me which worker type you’re aiming for (Docker? Kubernetes? Process?) and how you plan to run Ray (local vs. external cluster), I can give you a minimal, copy-pasteable config for that setup.
Useful docs, in case you like reading more than running flows:
- Work Pools and Workers: Work pools and Workers
- Deployments: Deployments guide
- Ray integration: prefect-ray docs (collection with RayTaskRunner)Marvin
08/08/2025, 11:50 PMMarvin
08/08/2025, 11:51 PMpip install prefect-ray ray
from prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner
import os
@task
def inc(x):
return x + 1
@flow(
task_runner=RayTaskRunner(
# Use an existing cluster if available, otherwise local
address=os.getenv("RAY_ADDRESS", "auto"),
# Optional: pass extra ray.init kwargs
init_kwargs={"namespace": "prefect"},
)
)
def my_flow():
futures = [inc.submit(i) for i in range(10)]
return sum(f.result() for f in futures)
if __name__ == "__main__":
my_flow()
2) Create a work pool of your desired worker type and start a worker
- Process worker:
prefect work-pool create "ray-process-pool" --type process
prefect worker start -p ray-process-pool
- Docker worker:
prefect work-pool create "ray-docker-pool" --type docker
prefect worker start -p ray-docker-pool
- Kubernetes worker:
prefect work-pool create "ray-k8s-pool" --type kubernetes
prefect worker start -p ray-k8s-pool
3) Point your deployment at that pool (prefect.yaml)
Process example:
deployments:
- name: ray-on-process
entrypoint: flows/my_flow.py:my_flow
work_pool:
name: ray-process-pool
Docker example (make sure the image has prefect, prefect-ray, and ray):
deployments:
- name: ray-on-docker
entrypoint: flows/my_flow.py:my_flow
work_pool:
name: ray-docker-pool
job_variables:
image: my-org/prefect-ray:latest
env:
# If you’re connecting to a remote Ray cluster
RAY_ADDRESS: <ray://ray-head:10001>
Kubernetes example:
deployments:
- name: ray-on-k8s
entrypoint: flows/my_flow.py:my_flow
work_pool:
name: ray-k8s-pool
job_variables:
image: my-org/prefect-ray:latest
namespace: default
env:
RAY_ADDRESS: <ray://ray-head:10001>
Then from your project directory:
prefect deploy
Key points to keep the universe from collapsing (again):
- Worker type is the work pool’s type. RayTaskRunner runs within whatever infrastructure that worker starts (process, container, pod).
- If using Docker/Kubernetes, your image must include prefect, prefect-ray, ray, and any cluster config. Either set RAY_ADDRESS
in the environment or pass address=...
to RayTaskRunner.
- There is no “ray” worker type in Prefect; Ray is the in-process task runner.
If you tell me which worker type you actually want (process, docker, k8s) and whether Ray is local or a remote cluster, I’ll hand you a minimal, copy-paste deployment config.
References, in case you enjoy documentation as much as I do:
- Work pools: Work pools
- Workers: Workers
- Deployments: Deployments guide
- Ray integration: prefect-ray docsSaad80
08/08/2025, 11:55 PMray.worker.data
<http://ray.worker.ml|ray.worker.ml>
etc. Each with its own docker container defined. I want to be able to target specific ray workers with specific prefect flows.Marvin
08/08/2025, 11:56 PMMarvin
08/08/2025, 11:57 PMpip install prefect-ray ray
from prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner
import ray
# Define Ray-remote functions pinned to your node types
@ray.remote(num_cpus=2, resources={"ray.worker.data": 0.01})
def ingest_impl():
# runs on a "data" node
return "some data"
@ray.remote(num_cpus=4, resources={"<http://ray.worker.ml|ray.worker.ml>": 0.01})
def train_impl(data):
# runs on an "ml" node
return f"trained on {data}"
@task
def ingest():
return ray.get(ingest_impl.remote())
@task
def train(data):
return ray.get(train_impl.remote(data))
@flow(task_runner=RayTaskRunner(address="auto"))
def pipeline():
d = ingest.submit()
m = train.submit(d)
return m.result()
if __name__ == "__main__":
pipeline()
Notes:
- The small fractional amount (0.01) is a common Ray trick to “require presence” of a custom resource without consuming the whole unit.
- If you prefer, you can use .options(...)
per call instead of decorating the function:
train_impl.options(resources={"<http://ray.worker.ml|ray.worker.ml>": 0.01}).remote(data)
3) If you need tighter packing or multi-resource bundles
Use placement groups and pass a scheduling strategy:
- Ray placement groups
Example sketch:
from ray.util.placement_group import placement_group
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
pg = placement_group([{"CPU": 4}], strategy="PACK")
ray.get(pg.ready())
@ray.remote
def train_impl(data):
return f"trained on {data}"
@task
def train(data, pg_name):
strategy = PlacementGroupSchedulingStrategy(placement_group=ray.util.get_placement_group(pg_name), placement_group_capture_child_tasks=True)
return ray.get(train_impl.options(scheduling_strategy=strategy).remote(data))
4) Prefect deployment and worker type are separate
- Use any Prefect work pool/worker type (process, docker, kubernetes) for your deployment. That only decides where the flow process runs.
- The Ray placement happens inside the flow via the resource hints shown above.
- Make sure the Prefect worker environment can reach your Ray cluster and has prefect-ray
+ ray
installed. Your Ray tasks will execute within the Ray worker node’s Docker image for that node type.
Useful references, because of course you wanted to read more:
- Ray custom resources: Custom resources and scheduling
- Placement groups: Placement group scheduling
- prefect-ray collection: prefect-ray docsMarvin
08/08/2025, 11:57 PMSaad80
08/09/2025, 12:04 AMMarvin
08/09/2025, 12:04 AMMarvin
08/09/2025, 12:05 AMfrom prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner
from prefect_ray.context import remote_options
import os, time, random
@task
def test(x):
print("test task started", x)
time.sleep(5)
print("test task finished", x)
return x
@flow(log_prints=True, task_runner=RayTaskRunner(
address=f"ray://{os.getenv('RAY_HEAD_NODE_IP')}:10001"
))
def test_ray():
# create two groups — one should run on data nodes, the other on ml nodes
data_args = [random.random() for _ in range(5)]
ml_args = [random.random() for _ in range(5)]
# Send these to "data" node type
with remote_options(resources={"ray.worker.data": 0.01}, num_cpus=1):
data_futures = [test.submit(a) for a in data_args]
# or: test.map(data_args)
# Send these to "ml" node type
with remote_options(resources={"<http://ray.worker.ml|ray.worker.ml>": 0.01}, num_cpus=2, num_gpus=1):
ml_futures = [test.submit(a) for a in ml_args]
# or: test.map(ml_args)
# Wait / collect if you need results
return [f.result() for f in data_futures + ml_futures]
if __name__ == "__main__":
test_ray()
Notes and gotchas:
- The remote_options context applies to any Prefect task submits within it (submit or map). No @ray.remote needed.
- Use a small fractional amount for the custom resource (e.g., 0.01) so you only “require presence” of that node type without consuming it entirely.
- Add Ray CPU/GPU asks alongside the resource to match the node’s capacity, e.g., num_gpus=1 for your ML nodes.
- Your Prefect worker type (process/docker/k8s) is unrelated; it just hosts the flow process that connects to Ray. The Ray scheduling decisions happen inside the flow run.
- Ensure those resource keys actually exist on your nodes. You can verify in a Ray shell:
import ray; ray.init(address="auto"); [n["Resources"] for n in ray.nodes()]
If you need to target more than two types, just add more remote_options blocks with the appropriate resource key. If you want to switch at runtime based on arguments, branch into the right context depending on your logic.
Docs for further misery reading:
- Custom resources and scheduling in Ray: Ray custom resources
- Placement groups (for stricter packing/affinity): Ray placement groups
- Prefect-ray collection: prefect-ray docs
If you share the exact resource labels and any GPU/CPU requirements per node type, I’ll tailor the context blocks so your tasks land exactly where you want.Saad80
08/09/2025, 12:06 AMwith remote_options(resources={"ray.worker.data": 0.01}, num_cpus=1):
?Marvin
08/09/2025, 12:06 AMMarvin
08/09/2025, 12:07 AMremote_options
context manager lets you apply Ray’s .options(...)
to all Prefect task submissions inside the block.
Good places to look:
- Prefect-Ray docs home: prefect-ray docs
- Source where remote_options
is defined: prefect_ray/context.py
- Example usage in a PrefectHQ PR discussion (shows the exact with remote_options(resources=...)
pattern): PR #82
Since remote_options
forwards to Ray’s task options, the accepted keywords map to Ray’s scheduling options (e.g., resources
, num_cpus
, num_gpus
, scheduling_strategy
, etc.). For those knobs, see Ray’s docs:
- Custom resources and scheduling: Ray custom resources
- Task options overview: Ray task scheduling
Quick reminder of usage:
from prefect_ray.context import remote_options
with remote_options(resources={"ray.worker.data": 0.01}, num_cpus=1):
# any Prefect task submits inside will carry these Ray options
futs = [my_task.submit(x) for x in items]
If you need, I can dig up a direct anchor in the generated API docs for remote_options
, but the source link above is the canonical definition.