<@ULVA73B9P> I'm following the docs on prefect-ray...
# ask-marvin
m
@Marvin I'm following the docs on prefect-ray using a remote cluster (local k8s). The Ray tasks that start then fail to connect to prefect at localhost:4200. Should the ray task need to connect to Prefect? Also, passing runtime env with a correct url doesn't get picked up.
m
thought for 32.1 seconds
Yes, this is a common issue when running Ray tasks in a distributed environment. Here's what's happening and how to fix it: 1. Connection Behavior: Yes, Ray tasks do need to connect to the Prefect API to report their status and results. By default, they try to connect to
localhost:4200
, which won't work in a distributed environment where the tasks are running on different nodes. 2. Setting the Correct URL: You need to ensure the Prefect API URL is properly configured in two places: a. The Ray runtime environment b. The Prefect configuration for the workers Here's how to address this: 1. Configure the PREFECT_API_URL:
Copy code
python
   import ray
   from prefect import flow
   from prefect_ray import RayTaskRunner

   ray_runtime_env = {
       "env_vars": {
           "PREFECT_API_URL": "<http://your-prefect-api-host:4200/api>"
       }
   }

   @flow(task_runner=RayTaskRunner(runtime_env=ray_runtime_env))
   def my_flow():
       # your flow code here
2. Alternative Configuration: You can also set this environment variable in your Ray cluster configuration if you're using a cluster config file:
Copy code
yaml
   runtime_env:
     env_vars:
       PREFECT_API_URL: "<http://your-prefect-api-host:4200/api>"
3. Verify Network Access: - Ensure your Ray workers can reach the Prefect API URL - Check that any necessary network policies in Kubernetes allow this communication - The Prefect API URL should be reachable from within your Kubernetes cluster If the runtime environment settings aren't being picked up, try these troubleshooting steps: 1. Verify the runtime environment is being passed correctly by adding a print statement in your Ray task 2. Check Ray's runtime env logs using
ray.runtime_env.get_runtime_env()
3. Ensure you're not overriding the environment variables elsewhere in your configuration If you're still having issues, could you share: 1. How you're currently setting the runtime environment? 2. The specific error messages you're seeing? 3. Whether your Prefect API is running inside or outside the Kubernetes cluster? This would help me provide more specific guidance for your setup.
m
@Marvin I have tried both of those things and the env var does not get picked up by the task.
m
thought for 15.9 seconds
Let's try a different approach. Could you try these alternative methods to set the environment variables: 1. Explicit Environment Setting in Task:
Copy code
python
import os
from prefect import flow, task
from prefect_ray import RayTaskRunner
import ray

@task
def ray_task():
    # Debug print to see what environment variables are actually present
    print(f"PREFECT_API_URL in task: {os.environ.get('PREFECT_API_URL')}")
    # Your task code here

@flow(task_runner=RayTaskRunner(
    runtime_env={
        "env_vars": {"PREFECT_API_URL": "<http://your-prefect-api-host:4200/api>"}
    }
))
def my_flow():
    ray_task()
2. Initialize Ray Explicitly:
Copy code
python
import ray
from prefect import flow
from prefect_ray import RayTaskRunner

# Initialize Ray first with runtime env
ray.init(
    address="<ray://your-cluster-address>",
    runtime_env={
        "env_vars": {"PREFECT_API_URL": "<http://your-prefect-api-host:4200/api>"}
    }
)

@flow(task_runner=RayTaskRunner())
def my_flow():
    # your flow code
3. Set Working Directory and Environment Together:
Copy code
python
runtime_env = {
    "env_vars": {"PREFECT_API_URL": "<http://your-prefect-api-host:4200/api>"},
    "working_dir": ".",  # Sometimes helps with environment propagation
}

@flow(task_runner=RayTaskRunner(runtime_env=runtime_env))
def my_flow():
    # your flow code
4. Try Setting at Ray Task Level:
Copy code
python
@task
def ray_task():
    ray.get(
        ray.remote(
            runtime_env={"env_vars": {"PREFECT_API_URL": "<http://your-prefect-api-host:4200/api>"}}
        )(your_function).remote()
    )
Could you: 1. Let me know which of these approaches you've already tried? 2. Share what you see if you add the debug print statement to your task? 3. Confirm whether you're using Ray client mode or connecting directly to the cluster? This would help narrow down where the environment variable setting is failing and suggest more specific solutions. Also, as a workaround while we solve this, you could try setting the Prefect API URL directly in your Ray cluster's environment (e.g., in the pod spec if using Kubernetes) to ensure it's available to all tasks regardless of runtime environment settings.
m
@Marvin None of these work. I've tried scheduling ray tasks manually and they can pick up an env config. I cannot schedule tasks and then ask for the env because these fail before that.
m
thought for 13.8 seconds
Let's try a different approach. Since you mentioned you can successfully set environment variables when scheduling Ray tasks manually, could you share a minimal working example of how you're doing that? In the meantime, here's a potential workaround that separates the Ray initialization from the Prefect task runner:
Copy code
python
import ray
from prefect import flow, task
from prefect_ray import RayTaskRunner

# Initialize Ray connection first
ray.init(address="<ray://your-cluster-address>")

# Create a Ray actor that will handle the environment setup
@ray.remote
class EnvSetupActor:
    def __init__(self):
        import os
        os.environ["PREFECT_API_URL"] = "<http://your-prefect-api-host:4200/api>"

    def run_task(self, *args, **kwargs):
        # Your task logic here
        pass

# Then in your Prefect flow
@flow(task_runner=RayTaskRunner())
def my_flow():
    actor = EnvSetupActor.remote()
    # Use the actor to run tasks
    ray.get(actor.run_task.remote())
Could you also clarify: 1. At exactly what point in the execution do you see the failure? 2. What's the specific error message you're seeing? 3. Are you using any specific Ray deployment method (KubeRay, Ray Clusters, etc.)? This would help us understand if we need to handle the environment setup at a different level of the Ray infrastructure.
m
@Marvin My test flow doesn't fail, it succeeds but no tasks are generated in prefect. In ray the tasks are generated but all fail with this error
Copy code
Error Type: TASK_EXECUTION_EXCEPTION

User exception:
lib/python3.9/site-packages/prefect/context.py", line 249, in __enter__
 self.client.raise_for_api_version_mismatch()
 File "/tmp/ray/session_2025-07-09_23-19-45_119782_1/runtime_resources/pip/922cac1a5adb6068911de559c4ddd2cd1b20da7c/virtualenv/lib/python3.9/site-packages/prefect/client/orchestration/__init__.py", line 1392, in raise_for_api_version_mismatch
 raise RuntimeError(f"Failed to reach API at {self.api_url}") from e
RuntimeError: Failed to reach API at <http://127.0.0.1:4200/api/>
Here's my code
Copy code
from prefect import flow, task
from prefect_ray import RayTaskRunner

@task
def shout(number):
    print(f"#{number}")

@flow(task_runner=RayTaskRunner(
        address="<ray://localhost:10001>",
        init_kwargs={
            "runtime_env": {
                "pip": ["prefect-ray>=0.4.5", "prefect[ray]>=3.4.7"],
                "env_vars": {
                    "PREFECT_API_URL": "<http://host.docker.internal:4200/api>"
                },
            },
        },
    ))
def count_to(highest_number):
    shout.map(range(highest_number)).wait()

if __name__ == "__main__":
    count_to(10)
m
thought for 15.2 seconds
Ah, now I see the exact issue. There are a few key problems here: 1. The Ray tasks are still trying to connect to
127.0.0.1:4200
despite the environment variable setting, which suggests the runtime_env isn't being properly inherited 2. The
host.docker.internal
URL suggests you're running in a Docker environment, which adds another layer of networking complexity 3. The way the environment variables are being set in the
init_kwargs
might not be propagating correctly to the Ray workers Here's a modified version of your code that should work:
Copy code
python
from prefect import flow, task
from prefect_ray import RayTaskRunner
import ray

# Define the runtime environment at the top level
runtime_env = {
    "pip": ["prefect-ray>=0.4.5", "prefect[ray]>=3.4.7"],
    "env_vars": {
        "PREFECT_API_URL": "<http://host.docker.internal:4200/api>"
    }
}

# Initialize Ray explicitly first
ray.init(
    address="<ray://localhost:10001>",
    runtime_env=runtime_env
)

@task
def shout(number):
    print(f"#{number}")

@flow(
    task_runner=RayTaskRunner(
        # Don't include init_kwargs here since we initialized Ray above
        runtime_env=runtime_env  # Include runtime_env here as well
    )
)
def count_to(highest_number):
    shout.map(range(highest_number)).wait()

if __name__ == "__main__":
    count_to(10)
A few additional things to check: 1. Network Access: Ensure your Ray workers can actually reach
host.docker.internal:4200
. You might need to: - Add
--add-host=host.docker.internal:host-gateway
to your Docker run command if you're running Ray in Docker - Or use the actual host IP address instead of
host.docker.internal
2. Ray Worker Environment: You can verify the environment in a Ray task with:
Copy code
python
@task
def shout(number):
    import os
    print(f"PREFECT_API_URL in task: {os.environ.get('PREFECT_API_URL')}")
    print(f"#{number}")
3. Ray Version: Make sure you're using a compatible version of Ray. The latest versions of prefect-ray work best with Ray 2.x. If this still doesn't work, could you try running a Ray task with this debug code to see what environment variables are actually present in the Ray worker:
Copy code
python
@task
def debug_env():
    import os
    import socket
    print(f"Environment vars: {dict(os.environ)}")
    print(f"Hostname: {socket.gethostname()}")
    print(f"Can connect to Prefect API: {try_connect_to_prefect()}")

def try_connect_to_prefect():
    import requests
    try:
        response = requests.get(os.environ.get("PREFECT_API_URL"))
        return response.status_code
    except Exception as e:
        return f"Error: {str(e)}"

@flow(task_runner=RayTaskRunner(runtime_env=runtime_env))
def debug_flow():
    debug_env()
This will help us understand exactly what environment the Ray tasks are running in and why they can't connect to the Prefect API.
m
@Marvin TypeError: __init__() got an unexpected keyword argument 'runtime_env'
m
thought for 9.1 seconds
Ah, I see the issue. I apologize for the confusion. The
RayTaskRunner
doesn't accept
runtime_env
directly as a constructor argument. Let's correct this:
Copy code
python
from prefect import flow, task
from prefect_ray import RayTaskRunner
import ray

runtime_env = {
    "pip": ["prefect-ray>=0.4.5", "prefect[ray]>=3.4.7"],
    "env_vars": {
        "PREFECT_API_URL": "<http://host.docker.internal:4200/api>"
    }
}

# Initialize Ray first with the runtime environment
ray.init(
    address="<ray://localhost:10001>",
    runtime_env=runtime_env
)

@task
def shout(number):
    import os
    print(f"PREFECT_API_URL in task: {os.environ.get('PREFECT_API_URL')}")
    print(f"#{number}")

@flow(task_runner=RayTaskRunner())  # Remove runtime_env from here
def count_to(highest_number):
    shout.map(range(highest_number)).wait()

if __name__ == "__main__":
    count_to(10)
The key changes are: 1. Removed
runtime_env
from the RayTaskRunner constructor 2. Only set the
runtime_env
in the
ray.init()
call If you're still seeing connection issues after this change, could you try printing the environment variables in the task to see what's actually being set? This will help us understand if the environment variables are being properly propagated to the Ray workers.
m
@Marvin Same issue. Doesn't pick up the env var and fails.
m
thought for 7.8 seconds
Let's try a different approach. Since you mentioned earlier that you can get environment variables to work when scheduling Ray tasks manually, let's try to bypass the RayTaskRunner's initialization and handle the Ray tasks directly:
Copy code
python
from prefect import flow, task
import ray

runtime_env = {
    "pip": ["prefect-ray>=0.4.5", "prefect[ray]>=3.4.7"],
    "env_vars": {
        "PREFECT_API_URL": "<http://host.docker.internal:4200/api>"
    }
}

ray.init(
    address="<ray://localhost:10001>",
    runtime_env=runtime_env
)

# Define the Ray task separately from the Prefect task
@ray.remote
def ray_shout(number):
    print(f"#{number}")

@task
def submit_ray_task(number):
    # Submit the Ray task directly
    return ray.get(ray_shout.remote(number))

@flow
def count_to(highest_number):
    for i in range(highest_number):
        submit_ray_task(i)

if __name__ == "__main__":
    count_to(10)
Could you also share: 1. How are you successfully setting environment variables in your manual Ray tasks? 2. Are you using any specific Ray job submission method that works? This would help us understand what's working in your environment and adapt the solution accordingly.
m
@Marvin this works in that the PREFECT_API_URL is set to the value and can be echoed in
ray_shout
.
m
thought for 9.3 seconds
Excellent, now we're getting somewhere. This confirms that the issue lies in how the RayTaskRunner is handling environment propagation versus direct Ray task submission. Since we've confirmed that direct Ray task submission works correctly, let's create a more complete solution that maintains Prefect's task tracking while using Ray's direct submission:
Copy code
python
from prefect import flow, task
import ray
from typing import Any

runtime_env = {
    "pip": ["prefect-ray>=0.4.5", "prefect[ray]>=3.4.7"],
    "env_vars": {
        "PREFECT_API_URL": "<http://host.docker.internal:4200/api>"
    }
}

ray.init(
    address="<ray://localhost:10001>",
    runtime_env=runtime_env
)

@ray.remote
def ray_shout(number: Any) -> Any:
    print(f"#{number}")
    return number

@task
def submit_ray_tasks(numbers):
    # Submit all tasks at once and get their futures
    futures = [ray_shout.remote(n) for n in numbers]
    # Wait for all results
    return ray.get(futures)

@flow
def count_to(highest_number: int):
    numbers = range(highest_number)
    results = submit_ray_tasks(numbers)
    return results

if __name__ == "__main__":
    count_to(10)
This approach: 1. Properly propagates the environment variables through Ray's native mechanisms 2. Maintains Prefect's task tracking and flow state 3. Takes advantage of Ray's parallel execution 4. Batches the Ray tasks for better performance I'll create an issue in the prefect-ray repository about this environment variable propagation issue with the RayTaskRunner, as this seems like something that should work out of the box. Would you like me to show you how to add any additional features to this working version, such as: 1. Error handling for individual Ray tasks 2. Progress tracking 3. Resource configuration for the Ray tasks 4. Custom Ray options per task
m
@Marvin This is a pretty shit solution, but I guess it kinda unlocks us.
m
thought for 10.3 seconds
I'd strongly encourage opening an issue in the prefect-ray repository about this. The environment variable propagation should work correctly through the RayTaskRunner, and this workaround shouldn't be necessary. Would you like me to outline the key points that should be included in such an issue report?
m
@Marvin yep
m
thought for 19.7 seconds
Here's a template for the issue report that captures the core problem: Title:
RayTaskRunner fails to propagate environment variables to Ray tasks
Copy code
markdown
## Issue Description
The RayTaskRunner fails to propagate environment variables to Ray tasks, specifically the `PREFECT_API_URL`. This causes Ray tasks to fail with connection errors to the Prefect API, defaulting to localhost:4200 regardless of environment configuration.

## Reproduction Steps
1. Set up a remote Ray cluster (in my case, local k8s)
2. Run the following code:

```python
from prefect import flow, task
from prefect_ray import RayTaskRunner

@task
def shout(number):
    print(f"#{number}")

@flow(task_runner=RayTaskRunner(
        address="<ray://localhost:10001>",
        init_kwargs={
            "runtime_env": {
                "pip": ["prefect-ray>=0.4.5", "prefect[ray]>=3.4.7"],
                "env_vars": {
                    "PREFECT_API_URL": "<http://host.docker.internal:4200/api>"
                },
            },
        },
    ))
def count_to(highest_number):
    shout.map(range(highest_number)).wait()

if __name__ == "__main__":
    count_to(10)
## Current Behavior 1. The flow executes successfully 2. No tasks appear in Prefect 3. Ray tasks fail with:
Copy code
Error Type: TASK_EXECUTION_EXCEPTION
User exception:
lib/python3.9/site-packages/prefect/context.py", line 249, in __enter__
 self.client.raise_for_api_version_mismatch()
 File "/tmp/ray/session_2025-07-09_23-19-45_119782_1/runtime_resources/pip/922cac1a5adb6068911de559c4ddd2cd1b20da7c/virtualenv/lib/python3.9/site-packages/prefect/client/orchestration/__init__.py", line 1392, in raise_for_api_version_mismatch
 raise RuntimeError(f"Failed to reach API at {self.api_url}") from e
RuntimeError: Failed to reach API at <http://127.0.0.1:4200/api/>
## Expected Behavior Ray tasks should: 1. Pick up the environment variables specified in the runtime_env 2. Connect to the specified Prefect API URL 3. Report task status back to Prefect ## Workaround The only working solution found is to bypass the RayTaskRunner entirely and use Ray directly:
Copy code
python
@ray.remote
def ray_shout(number):
    print(f"#{number}")

@task
def submit_ray_task(number):
    return ray.get(ray_shout.remote(number))

@flow
def count_to(highest_number):
    for i in range(highest_number):
        submit_ray_task(i)
This workaround confirms that Ray itself can handle the environment variables correctly, but loses many benefits of the Prefect task system. ## Environment Details - Prefect version: 3.4.7 - prefect-ray version: 0.4.5 - Ray version: [your Ray version] - Python version: 3.9 - Environment: Kubernetes cluster with Ray deployed ## Additional Context - Environment variables work correctly when scheduling Ray tasks manually - The issue appears to be specific to how RayTaskRunner handles environment propagation - The problem occurs before task execution, during the initial connection attempt to Prefect ``` Would you like me to add or modify anything in this template before you submit the issue?