Mal Curtis
07/11/2025, 4:59 AMMarvin
07/11/2025, 4:59 AMMarvin
07/11/2025, 4:59 AMlocalhost:4200
, which won't work in a distributed environment where the tasks are running on different nodes.
2. Setting the Correct URL: You need to ensure the Prefect API URL is properly configured in two places:
a. The Ray runtime environment
b. The Prefect configuration for the workers
Here's how to address this:
1. Configure the PREFECT_API_URL:
python
import ray
from prefect import flow
from prefect_ray import RayTaskRunner
ray_runtime_env = {
"env_vars": {
"PREFECT_API_URL": "<http://your-prefect-api-host:4200/api>"
}
}
@flow(task_runner=RayTaskRunner(runtime_env=ray_runtime_env))
def my_flow():
# your flow code here
2. Alternative Configuration:
You can also set this environment variable in your Ray cluster configuration if you're using a cluster config file:
yaml
runtime_env:
env_vars:
PREFECT_API_URL: "<http://your-prefect-api-host:4200/api>"
3. Verify Network Access:
- Ensure your Ray workers can reach the Prefect API URL
- Check that any necessary network policies in Kubernetes allow this communication
- The Prefect API URL should be reachable from within your Kubernetes cluster
If the runtime environment settings aren't being picked up, try these troubleshooting steps:
1. Verify the runtime environment is being passed correctly by adding a print statement in your Ray task
2. Check Ray's runtime env logs using ray.runtime_env.get_runtime_env()
3. Ensure you're not overriding the environment variables elsewhere in your configuration
If you're still having issues, could you share:
1. How you're currently setting the runtime environment?
2. The specific error messages you're seeing?
3. Whether your Prefect API is running inside or outside the Kubernetes cluster?
This would help me provide more specific guidance for your setup.Mal Curtis
07/11/2025, 5:01 AMMarvin
07/11/2025, 5:01 AMMarvin
07/11/2025, 5:01 AMpython
import os
from prefect import flow, task
from prefect_ray import RayTaskRunner
import ray
@task
def ray_task():
# Debug print to see what environment variables are actually present
print(f"PREFECT_API_URL in task: {os.environ.get('PREFECT_API_URL')}")
# Your task code here
@flow(task_runner=RayTaskRunner(
runtime_env={
"env_vars": {"PREFECT_API_URL": "<http://your-prefect-api-host:4200/api>"}
}
))
def my_flow():
ray_task()
2. Initialize Ray Explicitly:
python
import ray
from prefect import flow
from prefect_ray import RayTaskRunner
# Initialize Ray first with runtime env
ray.init(
address="<ray://your-cluster-address>",
runtime_env={
"env_vars": {"PREFECT_API_URL": "<http://your-prefect-api-host:4200/api>"}
}
)
@flow(task_runner=RayTaskRunner())
def my_flow():
# your flow code
3. Set Working Directory and Environment Together:
python
runtime_env = {
"env_vars": {"PREFECT_API_URL": "<http://your-prefect-api-host:4200/api>"},
"working_dir": ".", # Sometimes helps with environment propagation
}
@flow(task_runner=RayTaskRunner(runtime_env=runtime_env))
def my_flow():
# your flow code
4. Try Setting at Ray Task Level:
python
@task
def ray_task():
ray.get(
ray.remote(
runtime_env={"env_vars": {"PREFECT_API_URL": "<http://your-prefect-api-host:4200/api>"}}
)(your_function).remote()
)
Could you:
1. Let me know which of these approaches you've already tried?
2. Share what you see if you add the debug print statement to your task?
3. Confirm whether you're using Ray client mode or connecting directly to the cluster?
This would help narrow down where the environment variable setting is failing and suggest more specific solutions.
Also, as a workaround while we solve this, you could try setting the Prefect API URL directly in your Ray cluster's environment (e.g., in the pod spec if using Kubernetes) to ensure it's available to all tasks regardless of runtime environment settings.Mal Curtis
07/11/2025, 5:08 AMMarvin
07/11/2025, 5:08 AMMarvin
07/11/2025, 5:08 AMpython
import ray
from prefect import flow, task
from prefect_ray import RayTaskRunner
# Initialize Ray connection first
ray.init(address="<ray://your-cluster-address>")
# Create a Ray actor that will handle the environment setup
@ray.remote
class EnvSetupActor:
def __init__(self):
import os
os.environ["PREFECT_API_URL"] = "<http://your-prefect-api-host:4200/api>"
def run_task(self, *args, **kwargs):
# Your task logic here
pass
# Then in your Prefect flow
@flow(task_runner=RayTaskRunner())
def my_flow():
actor = EnvSetupActor.remote()
# Use the actor to run tasks
ray.get(actor.run_task.remote())
Could you also clarify:
1. At exactly what point in the execution do you see the failure?
2. What's the specific error message you're seeing?
3. Are you using any specific Ray deployment method (KubeRay, Ray Clusters, etc.)?
This would help us understand if we need to handle the environment setup at a different level of the Ray infrastructure.Mal Curtis
07/11/2025, 5:13 AMError Type: TASK_EXECUTION_EXCEPTION
User exception:
lib/python3.9/site-packages/prefect/context.py", line 249, in __enter__
self.client.raise_for_api_version_mismatch()
File "/tmp/ray/session_2025-07-09_23-19-45_119782_1/runtime_resources/pip/922cac1a5adb6068911de559c4ddd2cd1b20da7c/virtualenv/lib/python3.9/site-packages/prefect/client/orchestration/__init__.py", line 1392, in raise_for_api_version_mismatch
raise RuntimeError(f"Failed to reach API at {self.api_url}") from e
RuntimeError: Failed to reach API at <http://127.0.0.1:4200/api/>
Here's my code
from prefect import flow, task
from prefect_ray import RayTaskRunner
@task
def shout(number):
print(f"#{number}")
@flow(task_runner=RayTaskRunner(
address="<ray://localhost:10001>",
init_kwargs={
"runtime_env": {
"pip": ["prefect-ray>=0.4.5", "prefect[ray]>=3.4.7"],
"env_vars": {
"PREFECT_API_URL": "<http://host.docker.internal:4200/api>"
},
},
},
))
def count_to(highest_number):
shout.map(range(highest_number)).wait()
if __name__ == "__main__":
count_to(10)
Marvin
07/11/2025, 5:13 AMMarvin
07/11/2025, 5:13 AM127.0.0.1:4200
despite the environment variable setting, which suggests the runtime_env isn't being properly inherited
2. The host.docker.internal
URL suggests you're running in a Docker environment, which adds another layer of networking complexity
3. The way the environment variables are being set in the init_kwargs
might not be propagating correctly to the Ray workers
Here's a modified version of your code that should work:
python
from prefect import flow, task
from prefect_ray import RayTaskRunner
import ray
# Define the runtime environment at the top level
runtime_env = {
"pip": ["prefect-ray>=0.4.5", "prefect[ray]>=3.4.7"],
"env_vars": {
"PREFECT_API_URL": "<http://host.docker.internal:4200/api>"
}
}
# Initialize Ray explicitly first
ray.init(
address="<ray://localhost:10001>",
runtime_env=runtime_env
)
@task
def shout(number):
print(f"#{number}")
@flow(
task_runner=RayTaskRunner(
# Don't include init_kwargs here since we initialized Ray above
runtime_env=runtime_env # Include runtime_env here as well
)
)
def count_to(highest_number):
shout.map(range(highest_number)).wait()
if __name__ == "__main__":
count_to(10)
A few additional things to check:
1. Network Access: Ensure your Ray workers can actually reach host.docker.internal:4200
. You might need to:
- Add --add-host=host.docker.internal:host-gateway
to your Docker run command if you're running Ray in Docker
- Or use the actual host IP address instead of host.docker.internal
2. Ray Worker Environment: You can verify the environment in a Ray task with:
python
@task
def shout(number):
import os
print(f"PREFECT_API_URL in task: {os.environ.get('PREFECT_API_URL')}")
print(f"#{number}")
3. Ray Version: Make sure you're using a compatible version of Ray. The latest versions of prefect-ray work best with Ray 2.x.
If this still doesn't work, could you try running a Ray task with this debug code to see what environment variables are actually present in the Ray worker:
python
@task
def debug_env():
import os
import socket
print(f"Environment vars: {dict(os.environ)}")
print(f"Hostname: {socket.gethostname()}")
print(f"Can connect to Prefect API: {try_connect_to_prefect()}")
def try_connect_to_prefect():
import requests
try:
response = requests.get(os.environ.get("PREFECT_API_URL"))
return response.status_code
except Exception as e:
return f"Error: {str(e)}"
@flow(task_runner=RayTaskRunner(runtime_env=runtime_env))
def debug_flow():
debug_env()
This will help us understand exactly what environment the Ray tasks are running in and why they can't connect to the Prefect API.Mal Curtis
07/11/2025, 5:15 AMMarvin
07/11/2025, 5:15 AMMarvin
07/11/2025, 5:15 AMRayTaskRunner
doesn't accept runtime_env
directly as a constructor argument. Let's correct this:
python
from prefect import flow, task
from prefect_ray import RayTaskRunner
import ray
runtime_env = {
"pip": ["prefect-ray>=0.4.5", "prefect[ray]>=3.4.7"],
"env_vars": {
"PREFECT_API_URL": "<http://host.docker.internal:4200/api>"
}
}
# Initialize Ray first with the runtime environment
ray.init(
address="<ray://localhost:10001>",
runtime_env=runtime_env
)
@task
def shout(number):
import os
print(f"PREFECT_API_URL in task: {os.environ.get('PREFECT_API_URL')}")
print(f"#{number}")
@flow(task_runner=RayTaskRunner()) # Remove runtime_env from here
def count_to(highest_number):
shout.map(range(highest_number)).wait()
if __name__ == "__main__":
count_to(10)
The key changes are:
1. Removed runtime_env
from the RayTaskRunner constructor
2. Only set the runtime_env
in the ray.init()
call
If you're still seeing connection issues after this change, could you try printing the environment variables in the task to see what's actually being set? This will help us understand if the environment variables are being properly propagated to the Ray workers.Mal Curtis
07/11/2025, 5:20 AMMarvin
07/11/2025, 5:20 AMMarvin
07/11/2025, 5:21 AMpython
from prefect import flow, task
import ray
runtime_env = {
"pip": ["prefect-ray>=0.4.5", "prefect[ray]>=3.4.7"],
"env_vars": {
"PREFECT_API_URL": "<http://host.docker.internal:4200/api>"
}
}
ray.init(
address="<ray://localhost:10001>",
runtime_env=runtime_env
)
# Define the Ray task separately from the Prefect task
@ray.remote
def ray_shout(number):
print(f"#{number}")
@task
def submit_ray_task(number):
# Submit the Ray task directly
return ray.get(ray_shout.remote(number))
@flow
def count_to(highest_number):
for i in range(highest_number):
submit_ray_task(i)
if __name__ == "__main__":
count_to(10)
Could you also share:
1. How are you successfully setting environment variables in your manual Ray tasks?
2. Are you using any specific Ray job submission method that works?
This would help us understand what's working in your environment and adapt the solution accordingly.Mal Curtis
07/11/2025, 5:25 AMray_shout
.Marvin
07/11/2025, 5:25 AMMarvin
07/11/2025, 5:25 AMpython
from prefect import flow, task
import ray
from typing import Any
runtime_env = {
"pip": ["prefect-ray>=0.4.5", "prefect[ray]>=3.4.7"],
"env_vars": {
"PREFECT_API_URL": "<http://host.docker.internal:4200/api>"
}
}
ray.init(
address="<ray://localhost:10001>",
runtime_env=runtime_env
)
@ray.remote
def ray_shout(number: Any) -> Any:
print(f"#{number}")
return number
@task
def submit_ray_tasks(numbers):
# Submit all tasks at once and get their futures
futures = [ray_shout.remote(n) for n in numbers]
# Wait for all results
return ray.get(futures)
@flow
def count_to(highest_number: int):
numbers = range(highest_number)
results = submit_ray_tasks(numbers)
return results
if __name__ == "__main__":
count_to(10)
This approach:
1. Properly propagates the environment variables through Ray's native mechanisms
2. Maintains Prefect's task tracking and flow state
3. Takes advantage of Ray's parallel execution
4. Batches the Ray tasks for better performance
I'll create an issue in the prefect-ray repository about this environment variable propagation issue with the RayTaskRunner, as this seems like something that should work out of the box.
Would you like me to show you how to add any additional features to this working version, such as:
1. Error handling for individual Ray tasks
2. Progress tracking
3. Resource configuration for the Ray tasks
4. Custom Ray options per taskMal Curtis
07/11/2025, 5:27 AMMarvin
07/11/2025, 5:27 AMMarvin
07/11/2025, 5:27 AMMal Curtis
07/11/2025, 5:27 AMMarvin
07/11/2025, 5:27 AMMarvin
07/11/2025, 5:28 AMRayTaskRunner fails to propagate environment variables to Ray tasks
markdown
## Issue Description
The RayTaskRunner fails to propagate environment variables to Ray tasks, specifically the `PREFECT_API_URL`. This causes Ray tasks to fail with connection errors to the Prefect API, defaulting to localhost:4200 regardless of environment configuration.
## Reproduction Steps
1. Set up a remote Ray cluster (in my case, local k8s)
2. Run the following code:
```python
from prefect import flow, task
from prefect_ray import RayTaskRunner
@task
def shout(number):
print(f"#{number}")
@flow(task_runner=RayTaskRunner(
address="<ray://localhost:10001>",
init_kwargs={
"runtime_env": {
"pip": ["prefect-ray>=0.4.5", "prefect[ray]>=3.4.7"],
"env_vars": {
"PREFECT_API_URL": "<http://host.docker.internal:4200/api>"
},
},
},
))
def count_to(highest_number):
shout.map(range(highest_number)).wait()
if __name__ == "__main__":
count_to(10)
## Current Behavior
1. The flow executes successfully
2. No tasks appear in Prefect
3. Ray tasks fail with:
Error Type: TASK_EXECUTION_EXCEPTION
User exception:
lib/python3.9/site-packages/prefect/context.py", line 249, in __enter__
self.client.raise_for_api_version_mismatch()
File "/tmp/ray/session_2025-07-09_23-19-45_119782_1/runtime_resources/pip/922cac1a5adb6068911de559c4ddd2cd1b20da7c/virtualenv/lib/python3.9/site-packages/prefect/client/orchestration/__init__.py", line 1392, in raise_for_api_version_mismatch
raise RuntimeError(f"Failed to reach API at {self.api_url}") from e
RuntimeError: Failed to reach API at <http://127.0.0.1:4200/api/>
## Expected Behavior
Ray tasks should:
1. Pick up the environment variables specified in the runtime_env
2. Connect to the specified Prefect API URL
3. Report task status back to Prefect
## Workaround
The only working solution found is to bypass the RayTaskRunner entirely and use Ray directly:
python
@ray.remote
def ray_shout(number):
print(f"#{number}")
@task
def submit_ray_task(number):
return ray.get(ray_shout.remote(number))
@flow
def count_to(highest_number):
for i in range(highest_number):
submit_ray_task(i)
This workaround confirms that Ray itself can handle the environment variables correctly, but loses many benefits of the Prefect task system.
## Environment Details
- Prefect version: 3.4.7
- prefect-ray version: 0.4.5
- Ray version: [your Ray version]
- Python version: 3.9
- Environment: Kubernetes cluster with Ray deployed
## Additional Context
- Environment variables work correctly when scheduling Ray tasks manually
- The issue appears to be specific to how RayTaskRunner handles environment propagation
- The problem occurs before task execution, during the initial connection attempt to Prefect
```
Would you like me to add or modify anything in this template before you submit the issue?