https://prefect.io logo
Title
a

Aric Huang

05/03/2023, 8:15 PM
Hi, I'm seeing a strange issue when using Ray inside a flow that doesn't use
RayTaskRunner
. Ray is initialized from within a
task
and an
ActorPool
is used to run some logic, and then another
task
runs which may take a few minutes to run. After the task using Ray completes, after a minute or so the entire flow crashes with the following error:
gcs_rpc_client.h:533: Failed to connect to GCS within 60 seconds. GCS may have been killed. It's either GCS is terminated by `ray stop` or is killed unexpectedly. If it is killed unexpectedly, see the log file gcs_server.out. <https://docs.ray.io/en/master/ray-observability/ray-logging.html#logging-directory-structure>. The program will terminate.
I have a minimal example here: https://gist.github.com/concreted/6d9f4a1165fc79c510f9a63ac28363b4 Saving that to a file
ray_test.py
and running
python ray_test.py
should reproduce the issue. I see the issue on Ubuntu 18.04, but not on Mac.
This is using
ray==2.4.0
which should be the latest version
I switched to using
RayTaskRunner
and it doesn't crash anymore, but it doesn't seem like it should be required to use
RayTaskRunner
to use Ray within a task.
One thing I found is that when running the example flow, when the task using Ray runs, a
gcs_server
process starts, which is expected:
$ ps -aux | grep gcs_server
aric      7796  2.8  0.0 806636 27740 pts/6    Sl+  19:55   0:00 /opt/pyenv/versions/3.8.12/lib/python3.8/site-packages/ray/core/src/ray/gcs/gcs_server --redis_address=10.128.0.80 --redis_port=6379 ... -gcs_server_port=0 --metrics-agent-port=65472 --node-ip-address=10.128.0.80 ...
However as soon as the task completes, the
gcs_server
process is listed as
defunct
:
$ ps -aux | grep gcs_server
aric      7796  2.5  0.0      0     0 pts/6    Z+   19:55   0:00 [gcs_server] <defunct>
Any thoughts on why this is happening? This came up when porting an existing flow from Prefect V1 to V2. The V1 version of the flow works fine with
ray==2.4.0
on the same Ubuntu 18.04 hosts, so something seems to be different in V2.
Equivalent code using regular Python functions does not have this issue, using this minimal example: https://gist.github.com/concreted/37d039f6184add94e912a08322140cc3
^^ When running that script, the
gcs_server
process stays active after the function using Ray returns
My understanding from Ray documentation is that
gcs_server
is started automatically when Ray is initialized, and should be automatically shutdown when the Python process exits (https://docs.ray.io/en/latest/ray-core/api/doc/ray.shutdown.html):
This will automatically run at the end when a Python process that uses Ray exits
However when running with Prefect it seems to be getting terminated prematurely.
j

Jacob Danovitch

05/12/2023, 1:50 PM
I'm not 100% what the origin of the issue is (though maybe it has something to do with whatever magic async stuff Prefect does behind the scenes), but the reason that it works with
RayTaskRunner
is because it calls
ray.init()
before running your tasks, which you don't. If you add it to your script like this:
if __name__ == "__main__":
    ray.init()
    ray_test()
Or like this:
@flow(log_prints=True)
def ray_test():
    ray.init()
    test()
    sleep()
It doesn't crash. Tested on Ubuntu 20.04.5 LTS with
prefect==2.10.4
and
ray==2.3.0
; the crash is reproducible without
ray.init()
and disappears with either of the two fixes above.