https://prefect.io logo
k

Kha Nguyen

08/03/2022, 5:18 PM
I am trying to get my flow to run in a remote Ray (anyscale) cluster. However, my flow is failing whenever I try to submit a task. The error is that a Ray worker is trying to access something that is specific to my local machine (I am running directly on my local machine for development). I think this has something to do with local storage being used even in a distributed context. In the doc for RayTaskRunner, it recommends to use remote storage. I setup an S3 block, but then I have no idea what I suppose to do next. My flow is still using Local storage.
Copy code
from prefect import flow, task, get_run_logger
from prefect_ray import RayTaskRunner


@task
def generate_numbers(n: int) -> list[int]:
    return list(range(n))


@task
def double_the_numbers(d: int) -> int:
    return d * 2


@flow(
    task_runner=RayTaskRunner(
        address="<anyscale://singularity/demo1>",
        init_kwargs={"runtime_env": {"working_dir": "."}},
    )
)
def run_workflow():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Hello World!")

    numbers = generate_numbers.submit(100)
    <http://logger.info|logger.info>(numbers.result())
    doubled = double_the_numbers.map(numbers)

    <http://logger.info|logger.info>([d.wait().result() for d in doubled])


run_workflow()
Copy code
ray::begin_task_run() (pid=20006, ip=<http://xxx.xx.xx.xxx|xxx.xx.xx.xxx>)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 193, in wrapper
  File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 140, in run_async_in_new_loop
    return anyio.run(partial(__fn, *args, **kwargs))
  File "/home/ray/anaconda3/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/home/ray/anaconda3/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/ray/anaconda3/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
    return future.result()
  File "/home/ray/anaconda3/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/engine.py", line 946, in begin_task_run
    return await orchestrate_task_run(
  File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/engine.py", line 1083, in orchestrate_task_run
    await _persist_serialized_result(
  File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/results.py", line 15, in _persist_serialized_result
    await filesystem.write_path(key, content)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/filesystems.py", line 162, in write_path
    path.parent.mkdir(exist_ok=True, parents=True)
  File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1177, in mkdir
    self.parent.mkdir(parents=True, exist_ok=True)
  File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1177, in mkdir
    self.parent.mkdir(parents=True, exist_ok=True)
  File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1177, in mkdir
    self.parent.mkdir(parents=True, exist_ok=True)
  File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1173, in mkdir
    self._accessor.mkdir(self, mode)
PermissionError: [Errno 13] Permission denied: '/Users'
@Taylor Curran @Kalise Richmond
c

Christopher Boyd

08/03/2022, 5:32 PM
Hi Kha, Looks like you are missing the imports for storage blocks and usage: https://docs.prefect.io/api-ref/prefect/filesystems/#prefect.filesystems.S3
It would look like this in your flow:
Copy code
from prefect.filesystems import S3

s3_block = S3.load("BLOCK_NAME")
Also, would you mind condensing your code blocks into this thread, in lieu of multiple entries in the channel?
k

Kha Nguyen

08/03/2022, 5:53 PM
Oh my bad. I forgot to create a thread
Does it mean that, when I run S3.load, the filesystem will use S3 from then on?
I put that code in, and nothing changes
Copy code
from prefect import flow, task, get_run_logger
from prefect_ray import RayTaskRunner
from prefect.filesystems import S3


@task
def generate_numbers(n: int) -> list[int]:
    return list(range(n))


@task
def double_the_numbers(d: int) -> int:
    return d * 2


@flow(
    task_runner=RayTaskRunner(
        address="<anyscale://singularity/demo1>",
        init_kwargs={"runtime_env": {"working_dir": "."}},
    )
)
def run_workflow():
    s3_block = S3.load("demo")
    logger = get_run_logger()
    <http://logger.info|logger.info>("Hello World!")

    numbers = generate_numbers.submit(100)
    <http://logger.info|logger.info>(numbers.result())
    doubled = double_the_numbers.map(numbers)

    <http://logger.info|logger.info>([d.wait().result() for d in doubled])


run_workflow()
c

Christopher Boyd

08/03/2022, 6:35 PM
So there are a couple things: 1. I don’t think your s3_block needs to be in the flow definition. 2. Are you building a deployment then executing this from a deployment? 3. When you build and apply that deployment, are you passing the storage-block? https://orion-docs.prefect.io/concepts/storage/
k

Kha Nguyen

08/03/2022, 6:38 PM
No, this is run directly on my local computer. I will build a deployment when the flow is ready.
c

Christopher Boyd

08/03/2022, 6:39 PM
If you are running this locally on your computer without a deployment, the storage block is not being used
k

Kha Nguyen

08/03/2022, 6:40 PM
So the only way to use block storage, is to run from a deployment, right?
Is there a way to tell my local process to use the block storage, because I think my local machine is simply acting as an agent?
c

Christopher Boyd

08/03/2022, 6:42 PM
For the RayTask runner, I think the assumption here is that you have specified your storage block, and consequently that deployment where it will be packaged
What is this address?
Copy code
address="<anyscale://singularity/demo1>",
I think if your intent is to run this locally , then the address should be removed to spin up a local instance. If the intent is to point to another instance, then a storage block should be configure with the deployment
k

Kha Nguyen

08/03/2022, 7:38 PM
Anyscale for Ray is similar to Coiled for Dask. Whenever I init a Ray cluster, pointing to this anyscale address, then Anyscale will provision a cluster for me.
My intention is to run locally while using a remote Ray cluster.
Because during development, I do not wish to make deployment after minor adjustments. I will want to use the power of distributed Ray, however.
I just made a deployment, for the sake of using the remote file system (S3 block), and I am still seeing the same error.
Copy code
23:10:48.652 | INFO    | prefect.agent - Submitting flow run 'b6b7609b-3936-4b37-a70a-2ef069c9a3ad'
23:10:49.243 | INFO    | prefect.infrastructure.process - Opening process 'literate-honeybee'...
23:10:49.262 | INFO    | prefect.agent - Completed submission of flow run 'b6b7609b-3936-4b37-a70a-2ef069c9a3ad'
23:10:51.796 | INFO    | Flow run 'literate-honeybee' - Starting 'RayTaskRunner'; submitted tasks will be run in parallel...
23:10:51.797 | INFO    | prefect.task_runner.ray - Connecting to an existing Ray instance at <anyscale://singularity/demo1>
Authenticating
Loaded Anyscale authentication token from ~/.anyscale/credentials.json.

Parsing Ray Client arguments
[Warning] No working_dir specified! Files will only be uploaded to the cluster if a working_dir is provided or a project is detected. In the future, files will only be uploaded if working_dir is provided. To ensure files continue being imported going forward, set the working_dir in your runtime environment. See <https://docs.ray.io/en/latest/handling-dependencies.html#runtime-environments>.
Finished parsing arguments.

Choosing a project
Using the project singularity:
  name:               singularity
  project id:         prj_9dVsTxDmpWhannc9eLbHDaPv

Preparing the cluster
Cluster demo1 is currently running.
Connecting to this cluster:
  cluster id:                   ses_PXJLgBvwGrAM4DaFij4MaGdM
  cluster environment:          apt_ufz5Tj2dgkhiZXVmHvqR48DH:3
  cluster environment id:       bld_ne2CdBTrfiUBsMtLqpW1XxUw
  cluster compute:              singularity
  cluster compute id:           cpt_T4DnCqrTzVrqnhq4FUdmWfyw
  idle termination:             120 minutes
  maximum uptime:               disabled
  link:                         <https://console.anyscale.com/projects/prj_9dVsTxDmpWhannc9eLbHDaPv/clusters/ses_PXJLgBvwGrAM4DaFij4MaGdM>

Starting the interactive session
Connected to demo1.
Interactive session engine.py_08-03-2022_20:10:58 has started.
  interactive session id:       job_3etBZ5DVnCp3jmmsYR82U4k7
  runtime environment:          <https://console.anyscale.com/configurations/runtime-env/rte_HibSUB296PTaRr3m7XBPvJ5i>
  link:                         <https://console.anyscale.com/interactive-sessions/xxx>
23:11:19.847 | INFO    | prefect.task_runner.ray - Using Ray cluster with 1 nodes.
23:11:19.847 | INFO    | prefect.task_runner.ray - The Ray UI is available at <https://xxx>
23:11:21.020 | INFO    | Flow run 'literate-honeybee' - Hello World!
23:11:21.344 | INFO    | Flow run 'literate-honeybee' - Created task run 'generate_numbers-92c16399-0' for task 'generate_numbers'
23:11:22.566 | INFO    | Flow run 'literate-honeybee' - Submitted task run 'generate_numbers-92c16399-0' for execution.
23:11:24.210 | ERROR   | Flow run 'literate-honeybee' - Encountered exception during execution:
Traceback (most recent call last):
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/engine.py", line 557, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "./demo1.py", line 27, in run_workflow
    <http://logger.info|logger.info>(numbers.result())
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/futures.py", line 209, in result
    return sync(
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 221, in sync
    return run_async_from_worker_thread(__async_fn, *args, **kwargs)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/futures.py", line 220, in _result
    return final_state.result(raise_on_failure=raise_on_failure)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/orion/schemas/states.py", line 145, in result
    raise data
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect_ray/task_runners.py", line 172, in wait
    result = await ref
ray.exceptions.RayTaskError: ray::begin_task_run() (pid=2225, ip=172.31.38.127)
FileNotFoundError: [Errno 2] No such file or directory: '/Users/kha/.prefect/storage'

During handling of the above exception, another exception occurred:

ray::begin_task_run() (pid=2225, ip=172.31.38.127)
  File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1173, in mkdir
    self._accessor.mkdir(self, mode)
FileNotFoundError: [Errno 2] No such file or directory: '/Users/kha/.prefect'

During handling of the above exception, another exception occurred:

ray::begin_task_run() (pid=2225, ip=172.31.38.127)
  File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1173, in mkdir
    self._accessor.mkdir(self, mode)
FileNotFoundError: [Errno 2] No such file or directory: '/Users/kha'

During handling of the above exception, another exception occurred:

ray::begin_task_run() (pid=2225, ip=172.31.38.127)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 193, in wrapper
  File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 140, in run_async_in_new_loop
    return anyio.run(partial(__fn, *args, **kwargs))
  File "/home/ray/anaconda3/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/home/ray/anaconda3/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/ray/anaconda3/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
    return future.result()
  File "/home/ray/anaconda3/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/engine.py", line 946, in begin_task_run
    return await orchestrate_task_run(
  File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/engine.py", line 1083, in orchestrate_task_run
    await _persist_serialized_result(
  File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/results.py", line 15, in _persist_serialized_result
    await filesystem.write_path(key, content)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/filesystems.py", line 162, in write_path
    path.parent.mkdir(exist_ok=True, parents=True)
  File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1177, in mkdir
    self.parent.mkdir(parents=True, exist_ok=True)
  File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1177, in mkdir
    self.parent.mkdir(parents=True, exist_ok=True)
  File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1177, in mkdir
    self.parent.mkdir(parents=True, exist_ok=True)
  File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1173, in mkdir
    self._accessor.mkdir(self, mode)
PermissionError: [Errno 13] Permission denied: '/Users'
I wonder if we are talking about the same thing. The S3 block that you referred to seem to be used to store the python code to be used for execution.
My issue at the moment is why this simple task cannot run on a Ray cluster. We have a task, as simple as it possibly can be, and as soon as a submit request is made, it starts to look for something on my local computer ‘/Users/kha/.prefect/storage’.
For completeness, here was how a deployment was made
Copy code
prefect deployment build ./demo1.py:run_workflow -n demo -t demo -i process -sb s3/demo
prefect deployment apply run_workflow-deployment.yaml
Copy code
###
### A complete description of a Prefect Deployment for flow 'run-workflow'
###
name: demo
description: a demo deployment
tags:
- demo
parameters: {}
schedule: null
infrastructure:
  type: process
  env: {}
  labels: {}
  name: null
  command:
  - python
  - -m
  - prefect.engine
  stream_output: true
###
### DO NOT EDIT BELOW THIS LINE
###
flow_name: run-workflow
manifest_path: run_workflow-manifest.json
storage:
  bucket_path: xxx
  aws_access_key_id: '**********'
  aws_secret_access_key: '**********'
  _is_anonymous: true
  _block_document_name: anonymous-768c2c8b-7b65-4281-b6b4-c549446cf49a
  _block_document_id: 262d5c8e-e649-4aa2-afa3-ee17a8813efa
  _block_type_slug: s3
parameter_openapi_schema:
  title: Parameters
  type: object
  properties: {}
  required: null
  definitions: null
a

Andrew Huang

08/04/2022, 1:53 AM
Are you able to run this, without providing any address keywords?
Copy code
import time

from prefect import flow, task
from prefect_ray import RayTaskRunner

@task
def shout(number):
    time.sleep(0.5)
    print(f"#{number}")

@flow(task_runner=RayTaskRunner)
def count_to(highest_number):
    for number in range(highest_number):
        shout.submit(number)

if __name__ == "__main__":
    count_to(10)
Or drop
init_kwargs={"runtime_env": {"working_dir": "."}}
in your example
k

Kha Nguyen

08/04/2022, 7:53 AM
It works when I run everything locally. Which was my original result. Then when I moved to Prefect 2.0 Cloud, there was some issue with importing “packaging.requirements”, even though I installed that the package “packaging”. Then when I moved to a remote Ray cluster (which is how we run in production, so just because a local toy example works, it is not production ready), when a task is submitted, I have traced the issue to the line of code when a result is trying to persist, it still uses LocalFileSystem. I think this problem should be solved when we use a remote file system to persist task outputs. However, in Prefect documentation, the storage, or filesystem is used to described both storage for flow code, and storage for task output, which is confusing. And then the document went on to say that we should use remote filesystem for distributed computing context, and then show an example of creating an S3 block to store flow code 🤷‍♂️.
What I really need at this time, is to tell the flow, when running, to use S3 filesystem instead of LocalFileSystem. I hope someone @Taylor Curran @Kalise Richmond @Christopher Boyd can answer if this is possible (a yes/no question) at the moment.
In case you are wondering, this line of code was hit when running using remote Ray cluster
Copy code
File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/filesystems.py", line 162, in write_path
    path.parent.mkdir(exist_ok=True, parents=True)
c

Christopher Boyd

08/04/2022, 11:58 AM
Hi Kha, Just so i’m following. Your initial issue was:
Copy code
self._accessor.mkdir(self, mode)
PermissionError: [Errno 13] Permission denied: '/Users'
This was resulting from using a remote Ray anyscale cluster without a remote storage block, and it did not have access to your local filesystem. Now, your latest is that is does run successfully locally, but does not run remote in anyscale when using a storage block. Is that accurate?
k

Kha Nguyen

08/04/2022, 1:44 PM
Hi Christopher, yes, that is accurate.
c

Christopher Boyd

08/04/2022, 2:47 PM
What is the full stack trace of the latest error you are receiving, and the command you used to register the deployment / storage block?
k

Kha Nguyen

08/05/2022, 8:25 AM
The S3 storage block was created in the UI, named
demo
. The deployment was made as follow
Copy code
prefect deployment build ./demo1.py:run_workflow -n demo -t local -i process -sb s3/demo
prefect deployment apply run_workflow-deployment.yaml
Full stack trace, as soon as a task is submitted
Copy code
Traceback (most recent call last):
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/engine.py", line 557, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "./demo1.py", line 27, in run_workflow
    <http://logger.info|logger.info>(numbers.result())
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/futures.py", line 209, in result
    return sync(
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 221, in sync
    return run_async_from_worker_thread(__async_fn, *args, **kwargs)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/futures.py", line 220, in _result
    return final_state.result(raise_on_failure=raise_on_failure)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/orion/schemas/states.py", line 145, in result
    raise data
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect_ray/task_runners.py", line 172, in wait
    result = await ref
ray.exceptions.RayTaskError: ray::begin_task_run() (pid=2225, ip=172.31.38.127)
FileNotFoundError: [Errno 2] No such file or directory: '/Users/kha/.prefect/storage'

During handling of the above exception, another exception occurred:

ray::begin_task_run() (pid=2225, ip=172.31.38.127)
  File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1173, in mkdir
    self._accessor.mkdir(self, mode)
FileNotFoundError: [Errno 2] No such file or directory: '/Users/kha/.prefect'

During handling of the above exception, another exception occurred:

ray::begin_task_run() (pid=2225, ip=172.31.38.127)
  File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1173, in mkdir
    self._accessor.mkdir(self, mode)
FileNotFoundError: [Errno 2] No such file or directory: '/Users/kha'

During handling of the above exception, another exception occurred:

ray::begin_task_run() (pid=2225, ip=172.31.38.127)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 193, in wrapper
  File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 140, in run_async_in_new_loop
    return anyio.run(partial(__fn, *args, **kwargs))
  File "/home/ray/anaconda3/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/home/ray/anaconda3/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/ray/anaconda3/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
    return future.result()
  File "/home/ray/anaconda3/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/engine.py", line 946, in begin_task_run
    return await orchestrate_task_run(
  File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/engine.py", line 1083, in orchestrate_task_run
    await _persist_serialized_result(
  File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/results.py", line 15, in _persist_serialized_result
    await filesystem.write_path(key, content)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/filesystems.py", line 162, in write_path
    path.parent.mkdir(exist_ok=True, parents=True)
  File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1177, in mkdir
    self.parent.mkdir(parents=True, exist_ok=True)
  File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1177, in mkdir
    self.parent.mkdir(parents=True, exist_ok=True)
  File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1177, in mkdir
    self.parent.mkdir(parents=True, exist_ok=True)
  File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1173, in mkdir
    self._accessor.mkdir(self, mode)
PermissionError: [Errno 13] Permission denied: '/Users'