Hi all! I am currently trying to use Prefect toget...
# prefect-community
c
Hi all! I am currently trying to use Prefect together with Ray using the @flow(task_runner=RayTaskRunner()) annotation. While on the head node of the cluster everything works fine, on the workers the flows continously fail with the message: "(begin_task_run pid=1659, ip=172.31.2.206) File "/home/ray/anaconda3/lib/python3.7/site-packages/prefect/orion/models/task_runs.py", line 282, in set_task_run_state (begin_task_run pid=1659, ip=172.31.2.206) raise ValueError(f"Invalid task run: {task_run_id}") (begin_task_run pid=1659, ip=172.31.2.206) ValueError: Invalid task run: 3e46e4ce-c740-47d8-9ac7-79951b6fd98f". Any idea why? I am using ray 1.12.1, prefect 2.0b5 and python 3.7.7.
1
k
Can I see what your flow looks like?
c
Sure, its something like this:
@task def upload_to_s3(bucket_name, key = None, text = None): if(key is None): key = 'test.txt' if(text is None): text = b'Here we have some data' s3 = boto3.resource('s3') object = s3.Object(bucket_name, key) object.put(Body=text) @flow(task_runner=RayTaskRunner()) def reading_and_uploading(paths, bucket_name): for path in paths: local_data = read_local_data(path) loaded_local_data = local_data.result() upload_to_s3(bucket_name, path, loaded_local_data) @task def read_local_data(path): with open(path, 'r') as f: return f.read() if name == "__main__": reading_and_uploading([str(Path.home())+"/efs/test1.txt", str(Path.home())+"/efs/test2.txt"], "ray-cluster-test-bucket")
a
Can you explain the problem you are trying to solve? based on your flow, it looks like you are performing IO operations - for that the default
ConcurrentTaskRunner
will likely be faster and solve your problem in a much easier way with less complexity involved
try the same using just
@flow
instead of
@flow(task_runner=RayTaskRunner())
c
So the goal is to try out the combined working of Ray and Prefect. The IO operation is just an arbitrary example I choose to test out EFS / AWS.
With the @flow annotation it is indeed working, same when i only have the ray master node running. I am a bit confused why it starts failing on worker nodes only with the mentioned error.
z
It looks like the Ray workers are not contacting the same database, are they on different machines?
🙏 1
c
yes they are indeed
z
Ah yeah you’ll need to either have a database or API that is contactable by all of the workers
c
oh okay, that makes sense
do you have any further documentation on how to realize this by chance?
z
You can • Use Cloud • Setup an API with
prefect orion start
and set the
PREFECT_API_URL
for your flow (it should be forwarded to the workers) • Setup a Postgres database and set the database connection url (also forwarded to the workers, this is probably the worst option)
🙏 1
I don’t think we have a tutorial on this yet, cc @terrence we can at least add a note to the Ray/Dask tutorial to start
🚀 1
c
The first point is regarding to Prefect Cloud I assume? So it would be a feature of that?
z
One advantage of Prefect Cloud is that we’ve set up the API somewhere for you and it’s always accessible
c
Alright got it
z
So you don’t have to worry about networking and such
👀 1
c
Thanks a lot to all of you for the extremely quick support here. I ll try it out
t
Good callout. The current Ray tutorial does not demonstrate using a non-local Ray instance. I happen to be in the process of reworking these docs so will note that we need to cover this scenario in more detail.
🙏 1