Hi all! I am currently trying to use Prefect together with Ray using the @flow(task_runner=RayTaskRunner()) annotation. While on the head node of the cluster everything works fine, on the workers the flows continously fail with the message: "(begin_task_run pid=1659, ip= File "/home/ray/anaconda3/lib/python3.7/site-packages/prefect/orion/models/task_runs.py", line 282, in set_task_run_state (begin_task_run pid=1659, ip= raise ValueError(f"Invalid task run: {task_run_id}") (begin_task_run pid=1659, ip= ValueError: Invalid task run: 3e46e4ce-c740-47d8-9ac7-79951b6fd98f". Any idea why? I am using ray 1.12.1, prefect 2.0b5 and python 3.7.7.
Can I see what your flow looks like?
Sure, its something like this:
@task def upload_to_s3(bucket_name, key = None, text = None): if(key is None): key = 'test.txt' if(text is None): text = b'Here we have some data' s3 = boto3.resource('s3') object = s3.Object(bucket_name, key) object.put(Body=text) @flow(task_runner=RayTaskRunner()) def reading_and_uploading(paths, bucket_name): for path in paths: local_data = read_local_data(path) loaded_local_data = local_data.result() upload_to_s3(bucket_name, path, loaded_local_data) @task def read_local_data(path): with open(path, 'r') as f: return f.read() if name == "__main__": reading_and_uploading([str(Path.home())+"/efs/test1.txt", str(Path.home())+"/efs/test2.txt"], "ray-cluster-test-bucket")
Can you explain the problem you are trying to solve? based on your flow, it looks like you are performing IO operations - for that the default
will likely be faster and solve your problem in a much easier way with less complexity involved
try the same using just
instead of
So the goal is to try out the combined working of Ray and Prefect. The IO operation is just an arbitrary example I choose to test out EFS / AWS.
With the @flow annotation it is indeed working, same when i only have the ray master node running. I am a bit confused why it starts failing on worker nodes only with the mentioned error.
It looks like the Ray workers are not contacting the same database, are they on different machines?
yes they are indeed
Ah yeah you’ll need to either have a database or API that is contactable by all of the workers
oh okay, that makes sense
do you have any further documentation on how to realize this by chance?
You can • Use Cloud • Setup an API with
prefect orion start
and set the
for your flow (it should be forwarded to the workers) • Setup a Postgres database and set the database connection url (also forwarded to the workers, this is probably the worst option)
I don’t think we have a tutorial on this yet, cc @terrence we can at least add a note to the Ray/Dask tutorial to start
The first point is regarding to Prefect Cloud I assume? So it would be a feature of that?
One advantage of Prefect Cloud is that we’ve set up the API somewhere for you and it’s always accessible
Alright got it
So you don’t have to worry about networking and such
Thanks a lot to all of you for the extremely quick support here. I ll try it out
Good callout. The current Ray tutorial does not demonstrate using a non-local Ray instance. I happen to be in the process of reworking these docs so will note that we need to cover this scenario in more detail.
