https://prefect.io logo
Title
j

Jason Bertman

10/11/2022, 4:40 PM
I have k8s cluster executing some pretty large flows on Orion via RayTaskRunner. I have a remote Ray cluster deployed with the KubeRay operator and an autoscaler running on the head node. It scales in and out properly, but this morning after about 23K task runs I'm seeing this:
Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::begin_task_run() (pid=77, ip=10.80.9.219)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1191, in orchestrate_task_run
    state = await propose_state(
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1496, in propose_state
    raise prefect.exceptions.Abort(response.details.reason)
prefect.exceptions.Abort: This run cannot transition to the RUNNING state from the RUNNING state.

During handling of the above exception, another exception occurred:

ray::begin_task_run() (pid=77, ip=10.80.9.219)
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 212, in wrapper
    return run_async_in_new_loop(async_fn, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 141, in run_async_in_new_loop
    return anyio.run(partial(__fn, *args, **kwargs))
  File "/usr/local/lib/python3.8/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/usr/local/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1121, in begin_task_run
    task_run.state.data._cache_data(await _retrieve_result(task_run.state))
AttributeError: 'NoneType' object has no attribute '_cache_data'
It seems like the engine is mistaking a task run for not running yet?
1
👀 2
m

Mason Menges

10/11/2022, 5:03 PM
Hey @Jason Bertman Would you be able to provide a minimum example of the flow you're running? nothing immediately comes to mind as to what could be causing this
j

Jason Bertman

10/11/2022, 5:20 PM
kind of hard to show what's going on behind the scenes as there's quite a bit of custom stuff going on... I realize it's a bit convuluted
from typing import List
from prefect import flow

from tasks.shell import shell_run_json
from tasks.collectors import service
from prefect_ray.task_runners import RayTaskRunner

def get_map_results(futures):
    return [f.result() for f in futures]

@flow(
    task_runner=RayTaskRunner(
        address="<ray://ray-cluster-kuberay-head-svc:10001>"
    ),
)
def main(targets: List, pstr: str = service.get_defaults()):
    n_futures = shell_run_json.map(
        command=[
            service.get_command(target=target, pstr=pstr)
            for target in targets
        ]
    )
    n_result = get_map_results(n_futures)
    some_parsed_data = list(
        filter(
            lambda x: not isinstance(x, (BaseException, type(None))),
            get_map_results(
                service.get_results.map(n_result)
            ),
        )
    )
    service.parse_results(n_result)
    # ... do more things, generate more tasks
But in general shell_run_json is just like Prefect 1's ShellTask, but enforces JSON outputs. Can see there's basically a command generator there (get_command). It seems that it's happening when f.result() is called. So maybe there's a better way to gather mapped results in Orion? If I catch exceptions there will there (get_map_results) be a hanging task?
a

Andrew Huang

10/11/2022, 5:47 PM
Rather than a custom
get_map_results()
what if you wrapped some_parsed_data inside a task, e.g.
@task
def parse_data(n_results):
    return list(
        filter(
            lambda x: not isinstance(x, (BaseException, type(None))),
            get_map_results(
                service.get_results.map(n_results)
            ),
        )
    )
j

Jason Bertman

10/12/2022, 12:33 PM
one part that a bit unclear was the differences in v1 / v2 in terms of the map reduce functionality. The only reason I was using a get_map_results was because my top level flow was always going to passing around futures. But it seems that you can pass mapped futures to other tasks, and they will resolve before being executed?
a

Andrew Huang

10/12/2022, 6:24 PM
Yes they should resolve before being executed.