Laxmi
10/24/2024, 4:34 PMRayTaskRunner
.
• The docs does not mention about enabling PREFECT_EXPERIMENTAL_ENABLE_TASK_SCHEDULING
for this to work when using distributed taskrunner but this still needs to be enabled for it to work. Sequential and concurrent works without enabling that though. Is this correct or have I overlooked something else?
• When using RayTaskRunner, I am calling my parent task with .submit()
so that it uses the specified task Runner. Now when I call another task from this task, should I use .submit() as well or not? When I call this another task with .submit() it is not executed at all. But when I call it without .submit(), I see that they are executed although the logs are only displayed in console and not in UI/cloud. So if I am using RayTaskRunner and I want the second task also to use the same task runner, what should I do?
MRE
@task
def task_b():
logger = get_run_logger()
logger.info("Running Task B")
time.sleep(100)
@task
def task_a():
logger = get_run_logger()
logger.info("Running Task A")
for i in range(5):
# task_b.submit() # This does not work
task_b() # this works but log is only displayed in the terminal. Does it use sequential run or the specified taskrunner now
@flow(task_runner=RayTaskRunner)
def my_flow():
task_a.submit()
Question 2:
Now when tasks are executed but returns a prefect futures, somehow it always tries to create a prefect storage directory under my name/user even though the deployment is run from the cloud or its a completely different deployment in the cloud executed by someone else entirely. I will add the stack trace in the thread.
I cannot seem to figure out from where its picking up that it needs to create a directory by my name. The only common thing between deployment created by me vs other is PREFECT_EXPERIMENTAL_ENABLE_TASK_SCHEDULING
env var and nothing else. They run in prefect cloud with completely separate ray clusters/envs.
Any pointers/suggestions greatly appreciated. Thank youLaxmi
10/24/2024, 4:39 PMInfo
Encountered exception during execution:
Traceback (most recent call last):
File "/home/ray/miniforge3/lib/python3.10/pathlib.py", line 1175, in mkdir
self._accessor.mkdir(self, mode)
FileNotFoundError: [Errno 2] No such file or directory: '/home/laxmi/.prefect/storage/parameters'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/ray/miniforge3/lib/python3.10/pathlib.py", line 1175, in mkdir
self._accessor.mkdir(self, mode)
FileNotFoundError: [Errno 2] No such file or directory: '/home/laxmi/.prefect/storage'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/ray/miniforge3/lib/python3.10/pathlib.py", line 1175, in mkdir
self._accessor.mkdir(self, mode)
FileNotFoundError: [Errno 2] No such file or directory: '/home/laxmi/.prefect'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/ray/miniforge3/lib/python3.10/site-packages/prefect/engine.py", line 2155, in orchestrate_task_run
result = await call.aresult()
File "/home/ray/miniforge3/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
return await asyncio.wrap_future(self.future)
File "/home/ray/miniforge3/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
result = self.fn(*self.args, **self.kwargs)
File "/home/ray/prefect-dev/liveeo/flows/tradeaware_analysis/raster_tiler/tasks.py", line 161, in create_tile_rasters_bulk
File "/home/ray/miniforge3/lib/python3.10/site-packages/prefect/tasks.py", line 943, in submit
return from_sync.wait_for_call_in_loop_thread(
File "/home/ray/miniforge3/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 218, in wait_for_call_in_loop_thread
return call.result()
File "/home/ray/miniforge3/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
return self.future.result(timeout=timeout)
File "/home/ray/miniforge3/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
return self.__get_result()
File "/home/ray/miniforge3/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/home/ray/miniforge3/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
result = await coro
File "/home/ray/miniforge3/lib/python3.10/site-packages/prefect/engine.py", line 2465, in create_autonomous_task_run
await factory.store_parameters(parameters_id, parameters)
File "/home/ray/miniforge3/lib/python3.10/site-packages/prefect/results.py", line 469, in store_parameters
await self.storage_block.write_path(
File "/home/ray/miniforge3/lib/python3.10/site-packages/prefect/filesystems.py", line 236, in write_path
path.parent.mkdir(exist_ok=True, parents=True)
File "/home/ray/miniforge3/lib/python3.10/pathlib.py", line 1179, in mkdir
self.parent.mkdir(parents=True, exist_ok=True)
File "/home/ray/miniforge3/lib/python3.10/pathlib.py", line 1179, in mkdir
self.parent.mkdir(parents=True, exist_ok=True)
File "/home/ray/miniforge3/lib/python3.10/pathlib.py", line 1179, in mkdir
self.parent.mkdir(parents=True, exist_ok=True)
File "/home/ray/miniforge3/lib/python3.10/pathlib.py", line 1175, in mkdir
self._accessor.mkdir(self, mode)
PermissionError: [Errno 13] Permission denied: '/home/laxmi'
Error
Finished in state Failed("Task run encountered an exception PermissionError: [Errno 13] Permission denied: '/home/laxmi'")
I've only encountered this issue when trying to call task from another task.Will
10/25/2024, 7:30 AMNate
10/25/2024, 3:27 PMNate
10/25/2024, 3:29 PMNate
10/25/2024, 3:32 PMPREFECT_EXPERIMENTAL_ENABLE_TASK_SCHEDULING
for this to work when using distributed taskrunner but this still needs to be enabled for it to work.
Im not sure that task scheduling is actually what you want here
task scheduling (or "background tasks") is a new execution protocol where long lived task workers are served, so that runs of that task can be submitted, and the workers are pushed those runs via websockets so they can be run in the background
i think you're looking to simply submit task runs to your ray cluster in the typical .submit protocol, is that right?Laxmi
10/28/2024, 10:52 AMRuntimeError: If you meant to submit a background task, you need to set `prefect config set PREFECT_EXPERIMENTAL_ENABLE_TASK_SCHEDULING=true` and use `your_task.submit()` instead of `your_task()`.
If we enable PREFECT_EXPERIMENTAL_ENABLE_TASK_SCHEDULING
, it no longer throws that error and allows us to call the child task with .submit. But I guess silently fails since it does not execute it at all. But if we remove .submit then those tasks run sequentially instead of using the specified RayTaskRunner. So I guess in case of a distributed taskrunner, we can still call another task from within a task but they are not executed by the specified taskrunner?
Another issue we have is when this env var is enabled, towards the end of the flow run, it fails with an error [post above] complaining about not being able to create a directory at /home/laxmi/.prefect/storage, even when that flow is running on prefect cloud on our ray clusters in anyscale cloud, triggered by someone else. And we are not able to pin point what is causing this or where to even look at.