https://prefect.io logo
Title
s

Santiago Gonzalez

04/11/2023, 3:05 PM
Hi, I have a flow that runs a databricks job, once it has finished, runs a shell command by calling
shell_run_command
task. When this task is called, I have the following error:
Encountered exception during execution:
Traceback (most recent call last):
  File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/engine.py", line 669, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
  File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "flows/flows/landbaron.py", line 149, in generate_lb_sites_flow
    ps_details_timestamp = shell_run_command(command=get_ps_details_timestamp_command(export_version))
  File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/tasks.py", line 491, in __call__
    mapped=False,
  File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/engine.py", line 972, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
  File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/_internal/concurrency/api.py", line 137, in wait_for_call_in_loop_thread
    return call.result()
  File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
  File "/usr/lib/python3.7/concurrent/futures/_base.py", line 428, in result
    return self.__get_result()
  File "/usr/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
  File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/engine.py", line 1137, in get_task_call_return_value
    return await future._result()
  File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/futures.py", line 241, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/engine.py", line 1551, in orchestrate_task_run
    result = await call.aresult()
  File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
  File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
  File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect_shell/commands.py", line 98, in shell_run_command
    shell_command, env=current_env, cwd=cwd
  File "/home/sgonzalez/venv/lib/python3.7/site-packages/anyio/_core/_subprocesses.py", line 135, in open_process
    start_new_session=start_new_session,
  File "/home/sgonzalez/venv/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 1112, in open_process
    start_new_session=start_new_session,
  File "/usr/lib/python3.7/asyncio/subprocess.py", line 217, in create_subprocess_exec
    stderr=stderr, **kwds)
  File "/usr/lib/python3.7/asyncio/base_events.py", line 1544, in subprocess_exec
    bufsize, **kwargs)
  File "/usr/lib/python3.7/asyncio/unix_events.py", line 193, in _make_subprocess_transport
    self._child_watcher_callback, transp)
  File "/usr/lib/python3.7/asyncio/unix_events.py", line 941, in add_child_handler
    "Cannot add child handler, "
RuntimeError: Cannot add child handler, the child watcher does not have a loop attached
Here there is an example of the code:
@flow
def flow():
   jar_downloaded = .....
   jar_uploaded_to_databricks = ....
   jar_uploaded_to_job = ....
   job_state, _ = jobs_runs_wait_for_completion(...)
   if job_state.get('result_state', {}) == 'SUCCESS':
      result = shell_run_command(command="aws s3 ls <s3://my-bucket/>" | "awk '{gsub('/', '', $2); print $$2} | tail -n 1")
d

David Anderson

04/14/2023, 12:03 PM
Did you discover a solution here? Running into a similar issue…
s

Santiago Gonzalez

04/14/2023, 12:13 PM
Not yet
I found a solution by doing
from prefect_shell import ShellOperation
from prefect import flow, get_run_logger

@flow
def flow():
    result: list = ShellOperation(commands=['something']).run()
    get_run_logger().info(f'Result: {result}')