Hello, I'm not sure if this should be posted in th...
# prefect-integrations
l
Hello, I'm not sure if this should be posted in the #CL09KU1K7 or here so please LMK if this is not the correct channel. Topic: Calling task from another task. Prefect version: > 2.18.x , <3.x.x Question 1: Calling task from another task when using distributed task runner like
RayTaskRunner
. • The docs does not mention about enabling
PREFECT_EXPERIMENTAL_ENABLE_TASK_SCHEDULING
for this to work when using distributed taskrunner but this still needs to be enabled for it to work. Sequential and concurrent works without enabling that though. Is this correct or have I overlooked something else? • When using RayTaskRunner, I am calling my parent task with .
submit()
so that it uses the specified task Runner. Now when I call another task from this task, should I use .submit() as well or not? When I call this another task with .submit() it is not executed at all. But when I call it without .submit(), I see that they are executed although the logs are only displayed in console and not in UI/cloud. So if I am using RayTaskRunner and I want the second task also to use the same task runner, what should I do? MRE
Copy code
@task
def task_b():
    logger = get_run_logger()
    logger.info("Running Task B")
    time.sleep(100)

@task
def task_a():
    logger = get_run_logger()
    logger.info("Running Task A")
    for i in range(5):
        # task_b.submit() # This does not work
        task_b() # this works but log is only displayed in the terminal. Does it use sequential run or the specified taskrunner now

@flow(task_runner=RayTaskRunner)
def my_flow():
    task_a.submit()
Question 2: Now when tasks are executed but returns a prefect futures, somehow it always tries to create a prefect storage directory under my name/user even though the deployment is run from the cloud or its a completely different deployment in the cloud executed by someone else entirely. I will add the stack trace in the thread. I cannot seem to figure out from where its picking up that it needs to create a directory by my name. The only common thing between deployment created by me vs other is
PREFECT_EXPERIMENTAL_ENABLE_TASK_SCHEDULING
env var and nothing else. They run in prefect cloud with completely separate ray clusters/envs. Any pointers/suggestions greatly appreciated. Thank you
Stack trace:
Copy code
Info
Encountered exception during execution:
Traceback (most recent call last):
  File "/home/ray/miniforge3/lib/python3.10/pathlib.py", line 1175, in mkdir
    self._accessor.mkdir(self, mode)
FileNotFoundError: [Errno 2] No such file or directory: '/home/laxmi/.prefect/storage/parameters'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ray/miniforge3/lib/python3.10/pathlib.py", line 1175, in mkdir
    self._accessor.mkdir(self, mode)
FileNotFoundError: [Errno 2] No such file or directory: '/home/laxmi/.prefect/storage'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ray/miniforge3/lib/python3.10/pathlib.py", line 1175, in mkdir
    self._accessor.mkdir(self, mode)
FileNotFoundError: [Errno 2] No such file or directory: '/home/laxmi/.prefect'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ray/miniforge3/lib/python3.10/site-packages/prefect/engine.py", line 2155, in orchestrate_task_run
    result = await call.aresult()
  File "/home/ray/miniforge3/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
  File "/home/ray/miniforge3/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/home/ray/prefect-dev/liveeo/flows/tradeaware_analysis/raster_tiler/tasks.py", line 161, in create_tile_rasters_bulk
  File "/home/ray/miniforge3/lib/python3.10/site-packages/prefect/tasks.py", line 943, in submit
    return from_sync.wait_for_call_in_loop_thread(
  File "/home/ray/miniforge3/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 218, in wait_for_call_in_loop_thread
    return call.result()
  File "/home/ray/miniforge3/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    return self.future.result(timeout=timeout)
  File "/home/ray/miniforge3/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
    return self.__get_result()
  File "/home/ray/miniforge3/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/home/ray/miniforge3/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
  File "/home/ray/miniforge3/lib/python3.10/site-packages/prefect/engine.py", line 2465, in create_autonomous_task_run
    await factory.store_parameters(parameters_id, parameters)
  File "/home/ray/miniforge3/lib/python3.10/site-packages/prefect/results.py", line 469, in store_parameters
    await self.storage_block.write_path(
  File "/home/ray/miniforge3/lib/python3.10/site-packages/prefect/filesystems.py", line 236, in write_path
    path.parent.mkdir(exist_ok=True, parents=True)
  File "/home/ray/miniforge3/lib/python3.10/pathlib.py", line 1179, in mkdir
    self.parent.mkdir(parents=True, exist_ok=True)
  File "/home/ray/miniforge3/lib/python3.10/pathlib.py", line 1179, in mkdir
    self.parent.mkdir(parents=True, exist_ok=True)
  File "/home/ray/miniforge3/lib/python3.10/pathlib.py", line 1179, in mkdir
    self.parent.mkdir(parents=True, exist_ok=True)
  File "/home/ray/miniforge3/lib/python3.10/pathlib.py", line 1175, in mkdir
    self._accessor.mkdir(self, mode)
PermissionError: [Errno 13] Permission denied: '/home/laxmi'
Error
Finished in state Failed("Task run encountered an exception PermissionError: [Errno 13] Permission denied: '/home/laxmi'")
I've only encountered this issue when trying to call task from another task.
w
CC @Emil Christensen @Nate @Aimee McManus thank you
n
hi @Will you can email support@prefect.io or reach out to your contact at prefect if you need something urgently
@Laxmi are you calling these tasks in a flow? the task scheduling (not .submit / .map which has first class support in 2.x) is experimental in 2.x, which is why its not documented can you give some more color on the code you're running?
> The docs does not mention about enabling
PREFECT_EXPERIMENTAL_ENABLE_TASK_SCHEDULING
for this to work when using distributed taskrunner but this still needs to be enabled for it to work. Im not sure that task scheduling is actually what you want here task scheduling (or "background tasks") is a new execution protocol where long lived task workers are served, so that runs of that task can be submitted, and the workers are pushed those runs via websockets so they can be run in the background i think you're looking to simply submit task runs to your ray cluster in the typical .submit protocol, is that right?
l
Hi @Nate, sorry for the mixups. Yes that's correct. I am calling these tasks in a flow and was wondering how this feature "calling tasks from another tasks" would work in case of distributed task runner or if this is even applicable in this case. We use the submit method so that our tasks are executed by this runner instead of running sequentially but calling a task with submit from another task here throws an error like below:
Copy code
RuntimeError:  If you meant to submit a background task, you need to set `prefect config set PREFECT_EXPERIMENTAL_ENABLE_TASK_SCHEDULING=true` and use `your_task.submit()` instead of `your_task()`.
If we enable
PREFECT_EXPERIMENTAL_ENABLE_TASK_SCHEDULING
, it no longer throws that error and allows us to call the child task with .submit. But I guess silently fails since it does not execute it at all. But if we remove .submit then those tasks run sequentially instead of using the specified RayTaskRunner. So I guess in case of a distributed taskrunner, we can still call another task from within a task but they are not executed by the specified taskrunner? Another issue we have is when this env var is enabled, towards the end of the flow run, it fails with an error [post above] complaining about not being able to create a directory at /home/laxmi/.prefect/storage, even when that flow is running on prefect cloud on our ray clusters in anyscale cloud, triggered by someone else. And we are not able to pin point what is causing this or where to even look at.