Jason Bertman
10/11/2022, 4:40 PMUnhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::begin_task_run() (pid=77, ip=10.80.9.219)
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1191, in orchestrate_task_run
state = await propose_state(
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1496, in propose_state
raise prefect.exceptions.Abort(response.details.reason)
prefect.exceptions.Abort: This run cannot transition to the RUNNING state from the RUNNING state.
During handling of the above exception, another exception occurred:
ray::begin_task_run() (pid=77, ip=10.80.9.219)
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 212, in wrapper
return run_async_in_new_loop(async_fn, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 141, in run_async_in_new_loop
return anyio.run(partial(__fn, *args, **kwargs))
File "/usr/local/lib/python3.8/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/usr/local/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1121, in begin_task_run
task_run.state.data._cache_data(await _retrieve_result(task_run.state))
AttributeError: 'NoneType' object has no attribute '_cache_data'
It seems like the engine is mistaking a task run for not running yet?Mason Menges
10/11/2022, 5:03 PMJason Bertman
10/11/2022, 5:20 PMfrom typing import List
from prefect import flow
from tasks.shell import shell_run_json
from tasks.collectors import service
from prefect_ray.task_runners import RayTaskRunner
def get_map_results(futures):
return [f.result() for f in futures]
@flow(
task_runner=RayTaskRunner(
address="<ray://ray-cluster-kuberay-head-svc:10001>"
),
)
def main(targets: List, pstr: str = service.get_defaults()):
n_futures = shell_run_json.map(
command=[
service.get_command(target=target, pstr=pstr)
for target in targets
]
)
n_result = get_map_results(n_futures)
some_parsed_data = list(
filter(
lambda x: not isinstance(x, (BaseException, type(None))),
get_map_results(
service.get_results.map(n_result)
),
)
)
service.parse_results(n_result)
# ... do more things, generate more tasks
But in general shell_run_json is just like Prefect 1's ShellTask, but enforces JSON outputs. Can see there's basically a command generator there (get_command).
It seems that it's happening when f.result() is called. So maybe there's a better way to gather mapped results in Orion? If I catch exceptions there will there (get_map_results) be a hanging task?Andrew Huang
10/11/2022, 5:47 PMget_map_results()
what if you wrapped some_parsed_data inside a task, e.g.
@task
def parse_data(n_results):
return list(
filter(
lambda x: not isinstance(x, (BaseException, type(None))),
get_map_results(
service.get_results.map(n_results)
),
)
)
Jason Bertman
10/12/2022, 12:33 PMAndrew Huang
10/12/2022, 6:24 PM