Santiago Gonzalez
04/11/2023, 3:05 PMshell_run_command
task. When this task is called, I have the following error:
Encountered exception during execution:
Traceback (most recent call last):
File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/engine.py", line 669, in orchestrate_flow_run
result = await flow_call.aresult()
File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
return await asyncio.wrap_future(self.future)
File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
result = self.fn(*self.args, **self.kwargs)
File "flows/flows/landbaron.py", line 149, in generate_lb_sites_flow
ps_details_timestamp = shell_run_command(command=get_ps_details_timestamp_command(export_version))
File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/tasks.py", line 491, in __call__
mapped=False,
File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/engine.py", line 972, in enter_task_run_engine
return from_sync.wait_for_call_in_loop_thread(begin_run)
File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/_internal/concurrency/api.py", line 137, in wait_for_call_in_loop_thread
return call.result()
File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
return self.future.result(timeout=timeout)
File "/usr/lib/python3.7/concurrent/futures/_base.py", line 428, in result
return self.__get_result()
File "/usr/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/engine.py", line 1137, in get_task_call_return_value
return await future._result()
File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/futures.py", line 241, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/engine.py", line 1551, in orchestrate_task_run
result = await call.aresult()
File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
return await asyncio.wrap_future(self.future)
File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
File "/home/sgonzalez/venv/lib/python3.7/site-packages/prefect_shell/commands.py", line 98, in shell_run_command
shell_command, env=current_env, cwd=cwd
File "/home/sgonzalez/venv/lib/python3.7/site-packages/anyio/_core/_subprocesses.py", line 135, in open_process
start_new_session=start_new_session,
File "/home/sgonzalez/venv/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 1112, in open_process
start_new_session=start_new_session,
File "/usr/lib/python3.7/asyncio/subprocess.py", line 217, in create_subprocess_exec
stderr=stderr, **kwds)
File "/usr/lib/python3.7/asyncio/base_events.py", line 1544, in subprocess_exec
bufsize, **kwargs)
File "/usr/lib/python3.7/asyncio/unix_events.py", line 193, in _make_subprocess_transport
self._child_watcher_callback, transp)
File "/usr/lib/python3.7/asyncio/unix_events.py", line 941, in add_child_handler
"Cannot add child handler, "
RuntimeError: Cannot add child handler, the child watcher does not have a loop attached
@flow
def flow():
jar_downloaded = .....
jar_uploaded_to_databricks = ....
jar_uploaded_to_job = ....
job_state, _ = jobs_runs_wait_for_completion(...)
if job_state.get('result_state', {}) == 'SUCCESS':
result = shell_run_command(command="aws s3 ls <s3://my-bucket/>" | "awk '{gsub('/', '', $2); print $$2} | tail -n 1")
David Anderson
04/14/2023, 12:03 PMSantiago Gonzalez
04/14/2023, 12:13 PMfrom prefect_shell import ShellOperation
from prefect import flow, get_run_logger
@flow
def flow():
result: list = ShellOperation(commands=['something']).run()
get_run_logger().info(f'Result: {result}')