Kha Nguyen
08/03/2022, 5:18 PMfrom prefect import flow, task, get_run_logger
from prefect_ray import RayTaskRunner
@task
def generate_numbers(n: int) -> list[int]:
return list(range(n))
@task
def double_the_numbers(d: int) -> int:
return d * 2
@flow(
task_runner=RayTaskRunner(
address="<anyscale://singularity/demo1>",
init_kwargs={"runtime_env": {"working_dir": "."}},
)
)
def run_workflow():
logger = get_run_logger()
<http://logger.info|logger.info>("Hello World!")
numbers = generate_numbers.submit(100)
<http://logger.info|logger.info>(numbers.result())
doubled = double_the_numbers.map(numbers)
<http://logger.info|logger.info>([d.wait().result() for d in doubled])
run_workflow()
ray::begin_task_run() (pid=20006, ip=<http://xxx.xx.xx.xxx|xxx.xx.xx.xxx>)
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 193, in wrapper
File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 140, in run_async_in_new_loop
return anyio.run(partial(__fn, *args, **kwargs))
File "/home/ray/anaconda3/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/home/ray/anaconda3/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/home/ray/anaconda3/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/home/ray/anaconda3/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
return future.result()
File "/home/ray/anaconda3/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/engine.py", line 946, in begin_task_run
return await orchestrate_task_run(
File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/engine.py", line 1083, in orchestrate_task_run
await _persist_serialized_result(
File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/results.py", line 15, in _persist_serialized_result
await filesystem.write_path(key, content)
File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/filesystems.py", line 162, in write_path
path.parent.mkdir(exist_ok=True, parents=True)
File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1177, in mkdir
self.parent.mkdir(parents=True, exist_ok=True)
File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1177, in mkdir
self.parent.mkdir(parents=True, exist_ok=True)
File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1177, in mkdir
self.parent.mkdir(parents=True, exist_ok=True)
File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1173, in mkdir
self._accessor.mkdir(self, mode)
PermissionError: [Errno 13] Permission denied: '/Users'
@Taylor Curran @Kalise RichmondChristopher Boyd
08/03/2022, 5:32 PMfrom prefect.filesystems import S3
s3_block = S3.load("BLOCK_NAME")
Kha Nguyen
08/03/2022, 5:53 PMfrom prefect import flow, task, get_run_logger
from prefect_ray import RayTaskRunner
from prefect.filesystems import S3
@task
def generate_numbers(n: int) -> list[int]:
return list(range(n))
@task
def double_the_numbers(d: int) -> int:
return d * 2
@flow(
task_runner=RayTaskRunner(
address="<anyscale://singularity/demo1>",
init_kwargs={"runtime_env": {"working_dir": "."}},
)
)
def run_workflow():
s3_block = S3.load("demo")
logger = get_run_logger()
<http://logger.info|logger.info>("Hello World!")
numbers = generate_numbers.submit(100)
<http://logger.info|logger.info>(numbers.result())
doubled = double_the_numbers.map(numbers)
<http://logger.info|logger.info>([d.wait().result() for d in doubled])
run_workflow()
Christopher Boyd
08/03/2022, 6:35 PMKha Nguyen
08/03/2022, 6:38 PMChristopher Boyd
08/03/2022, 6:39 PMKha Nguyen
08/03/2022, 6:40 PMChristopher Boyd
08/03/2022, 6:42 PMaddress="<anyscale://singularity/demo1>",
Kha Nguyen
08/03/2022, 7:38 PM23:10:48.652 | INFO | prefect.agent - Submitting flow run 'b6b7609b-3936-4b37-a70a-2ef069c9a3ad'
23:10:49.243 | INFO | prefect.infrastructure.process - Opening process 'literate-honeybee'...
23:10:49.262 | INFO | prefect.agent - Completed submission of flow run 'b6b7609b-3936-4b37-a70a-2ef069c9a3ad'
23:10:51.796 | INFO | Flow run 'literate-honeybee' - Starting 'RayTaskRunner'; submitted tasks will be run in parallel...
23:10:51.797 | INFO | prefect.task_runner.ray - Connecting to an existing Ray instance at <anyscale://singularity/demo1>
Authenticating
Loaded Anyscale authentication token from ~/.anyscale/credentials.json.
Parsing Ray Client arguments
[Warning] No working_dir specified! Files will only be uploaded to the cluster if a working_dir is provided or a project is detected. In the future, files will only be uploaded if working_dir is provided. To ensure files continue being imported going forward, set the working_dir in your runtime environment. See <https://docs.ray.io/en/latest/handling-dependencies.html#runtime-environments>.
Finished parsing arguments.
Choosing a project
Using the project singularity:
name: singularity
project id: prj_9dVsTxDmpWhannc9eLbHDaPv
Preparing the cluster
Cluster demo1 is currently running.
Connecting to this cluster:
cluster id: ses_PXJLgBvwGrAM4DaFij4MaGdM
cluster environment: apt_ufz5Tj2dgkhiZXVmHvqR48DH:3
cluster environment id: bld_ne2CdBTrfiUBsMtLqpW1XxUw
cluster compute: singularity
cluster compute id: cpt_T4DnCqrTzVrqnhq4FUdmWfyw
idle termination: 120 minutes
maximum uptime: disabled
link: <https://console.anyscale.com/projects/prj_9dVsTxDmpWhannc9eLbHDaPv/clusters/ses_PXJLgBvwGrAM4DaFij4MaGdM>
Starting the interactive session
Connected to demo1.
Interactive session engine.py_08-03-2022_20:10:58 has started.
interactive session id: job_3etBZ5DVnCp3jmmsYR82U4k7
runtime environment: <https://console.anyscale.com/configurations/runtime-env/rte_HibSUB296PTaRr3m7XBPvJ5i>
link: <https://console.anyscale.com/interactive-sessions/xxx>
23:11:19.847 | INFO | prefect.task_runner.ray - Using Ray cluster with 1 nodes.
23:11:19.847 | INFO | prefect.task_runner.ray - The Ray UI is available at <https://xxx>
23:11:21.020 | INFO | Flow run 'literate-honeybee' - Hello World!
23:11:21.344 | INFO | Flow run 'literate-honeybee' - Created task run 'generate_numbers-92c16399-0' for task 'generate_numbers'
23:11:22.566 | INFO | Flow run 'literate-honeybee' - Submitted task run 'generate_numbers-92c16399-0' for execution.
23:11:24.210 | ERROR | Flow run 'literate-honeybee' - Encountered exception during execution:
Traceback (most recent call last):
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/engine.py", line 557, in orchestrate_flow_run
result = await run_sync(flow_call)
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "./demo1.py", line 27, in run_workflow
<http://logger.info|logger.info>(numbers.result())
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/futures.py", line 209, in result
return sync(
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 221, in sync
return run_async_from_worker_thread(__async_fn, *args, **kwargs)
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/concurrent/futures/_base.py", line 446, in result
return self.__get_result()
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/futures.py", line 220, in _result
return final_state.result(raise_on_failure=raise_on_failure)
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/orion/schemas/states.py", line 145, in result
raise data
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect_ray/task_runners.py", line 172, in wait
result = await ref
ray.exceptions.RayTaskError: ray::begin_task_run() (pid=2225, ip=172.31.38.127)
FileNotFoundError: [Errno 2] No such file or directory: '/Users/kha/.prefect/storage'
During handling of the above exception, another exception occurred:
ray::begin_task_run() (pid=2225, ip=172.31.38.127)
File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1173, in mkdir
self._accessor.mkdir(self, mode)
FileNotFoundError: [Errno 2] No such file or directory: '/Users/kha/.prefect'
During handling of the above exception, another exception occurred:
ray::begin_task_run() (pid=2225, ip=172.31.38.127)
File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1173, in mkdir
self._accessor.mkdir(self, mode)
FileNotFoundError: [Errno 2] No such file or directory: '/Users/kha'
During handling of the above exception, another exception occurred:
ray::begin_task_run() (pid=2225, ip=172.31.38.127)
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 193, in wrapper
File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 140, in run_async_in_new_loop
return anyio.run(partial(__fn, *args, **kwargs))
File "/home/ray/anaconda3/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/home/ray/anaconda3/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/home/ray/anaconda3/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/home/ray/anaconda3/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
return future.result()
File "/home/ray/anaconda3/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/engine.py", line 946, in begin_task_run
return await orchestrate_task_run(
File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/engine.py", line 1083, in orchestrate_task_run
await _persist_serialized_result(
File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/results.py", line 15, in _persist_serialized_result
await filesystem.write_path(key, content)
File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/filesystems.py", line 162, in write_path
path.parent.mkdir(exist_ok=True, parents=True)
File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1177, in mkdir
self.parent.mkdir(parents=True, exist_ok=True)
File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1177, in mkdir
self.parent.mkdir(parents=True, exist_ok=True)
File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1177, in mkdir
self.parent.mkdir(parents=True, exist_ok=True)
File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1173, in mkdir
self._accessor.mkdir(self, mode)
PermissionError: [Errno 13] Permission denied: '/Users'
prefect deployment build ./demo1.py:run_workflow -n demo -t demo -i process -sb s3/demo
prefect deployment apply run_workflow-deployment.yaml
###
### A complete description of a Prefect Deployment for flow 'run-workflow'
###
name: demo
description: a demo deployment
tags:
- demo
parameters: {}
schedule: null
infrastructure:
type: process
env: {}
labels: {}
name: null
command:
- python
- -m
- prefect.engine
stream_output: true
###
### DO NOT EDIT BELOW THIS LINE
###
flow_name: run-workflow
manifest_path: run_workflow-manifest.json
storage:
bucket_path: xxx
aws_access_key_id: '**********'
aws_secret_access_key: '**********'
_is_anonymous: true
_block_document_name: anonymous-768c2c8b-7b65-4281-b6b4-c549446cf49a
_block_document_id: 262d5c8e-e649-4aa2-afa3-ee17a8813efa
_block_type_slug: s3
parameter_openapi_schema:
title: Parameters
type: object
properties: {}
required: null
definitions: null
Andrew Huang
08/04/2022, 1:53 AMimport time
from prefect import flow, task
from prefect_ray import RayTaskRunner
@task
def shout(number):
time.sleep(0.5)
print(f"#{number}")
@flow(task_runner=RayTaskRunner)
def count_to(highest_number):
for number in range(highest_number):
shout.submit(number)
if __name__ == "__main__":
count_to(10)
Or drop init_kwargs={"runtime_env": {"working_dir": "."}}
in your exampleKha Nguyen
08/04/2022, 7:53 AMFile "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/filesystems.py", line 162, in write_path
path.parent.mkdir(exist_ok=True, parents=True)
Christopher Boyd
08/04/2022, 11:58 AMself._accessor.mkdir(self, mode)
PermissionError: [Errno 13] Permission denied: '/Users'
This was resulting from using a remote Ray anyscale cluster without a remote storage block, and it did not have access to your local filesystem.
Now, your latest is that is does run successfully locally, but does not run remote in anyscale when using a storage block. Is that accurate?Kha Nguyen
08/04/2022, 1:44 PMChristopher Boyd
08/04/2022, 2:47 PMKha Nguyen
08/05/2022, 8:25 AMdemo
.
The deployment was made as follow
prefect deployment build ./demo1.py:run_workflow -n demo -t local -i process -sb s3/demo
prefect deployment apply run_workflow-deployment.yaml
Traceback (most recent call last):
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/engine.py", line 557, in orchestrate_flow_run
result = await run_sync(flow_call)
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "./demo1.py", line 27, in run_workflow
<http://logger.info|logger.info>(numbers.result())
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/futures.py", line 209, in result
return sync(
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 221, in sync
return run_async_from_worker_thread(__async_fn, *args, **kwargs)
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/concurrent/futures/_base.py", line 446, in result
return self.__get_result()
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/futures.py", line 220, in _result
return final_state.result(raise_on_failure=raise_on_failure)
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/orion/schemas/states.py", line 145, in result
raise data
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect_ray/task_runners.py", line 172, in wait
result = await ref
ray.exceptions.RayTaskError: ray::begin_task_run() (pid=2225, ip=172.31.38.127)
FileNotFoundError: [Errno 2] No such file or directory: '/Users/kha/.prefect/storage'
During handling of the above exception, another exception occurred:
ray::begin_task_run() (pid=2225, ip=172.31.38.127)
File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1173, in mkdir
self._accessor.mkdir(self, mode)
FileNotFoundError: [Errno 2] No such file or directory: '/Users/kha/.prefect'
During handling of the above exception, another exception occurred:
ray::begin_task_run() (pid=2225, ip=172.31.38.127)
File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1173, in mkdir
self._accessor.mkdir(self, mode)
FileNotFoundError: [Errno 2] No such file or directory: '/Users/kha'
During handling of the above exception, another exception occurred:
ray::begin_task_run() (pid=2225, ip=172.31.38.127)
File "/opt/homebrew/Caskroom/miniforge/base/envs/singularity2/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 193, in wrapper
File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 140, in run_async_in_new_loop
return anyio.run(partial(__fn, *args, **kwargs))
File "/home/ray/anaconda3/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/home/ray/anaconda3/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/home/ray/anaconda3/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/home/ray/anaconda3/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
return future.result()
File "/home/ray/anaconda3/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/engine.py", line 946, in begin_task_run
return await orchestrate_task_run(
File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/engine.py", line 1083, in orchestrate_task_run
await _persist_serialized_result(
File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/results.py", line 15, in _persist_serialized_result
await filesystem.write_path(key, content)
File "/home/ray/anaconda3/lib/python3.10/site-packages/prefect/filesystems.py", line 162, in write_path
path.parent.mkdir(exist_ok=True, parents=True)
File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1177, in mkdir
self.parent.mkdir(parents=True, exist_ok=True)
File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1177, in mkdir
self.parent.mkdir(parents=True, exist_ok=True)
File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1177, in mkdir
self.parent.mkdir(parents=True, exist_ok=True)
File "/home/ray/anaconda3/lib/python3.10/pathlib.py", line 1173, in mkdir
self._accessor.mkdir(self, mode)
PermissionError: [Errno 13] Permission denied: '/Users'