Ankit
07/13/2023, 1:51 PMAssertionError
Traceback (most recent call last):
File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.8/asyncio/base_events.py", line 603, in run_until_complete
self.run_forever()
File "/usr/lib/python3.8/asyncio/base_events.py", line 570, in run_forever
self._run_once()
File "/usr/lib/python3.8/asyncio/base_events.py", line 1823, in _run_once
event_list = self._selector.select(timeout)
File "/usr/lib/python3.8/selectors.py", line 468, in select
fd_event_list = self._selector.poll(timeout, max_ev)
File "/usr/local/lib/python3.8/dist-packages/prefect/engine.py", line 1670, in cancel_flow_run
raise TerminationSignal(signal=signal.SIGTERM)
prefect.exceptions.TerminationSignal
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/prefect/engine.py", line 1533, in orchestrate_task_run
result = await run_sync(task.fn, *args, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/prefect/utilities/asyncutils.py", line 165, in run_sync_in_interruptible_worker_thread
assert result is not NotSet
AssertionError
Finished in state Failed('Task run encountered an exception: Traceback (most recent call last):\n File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run\n return loop.run_until_complete(main)\n File "/usr/lib/python3.8/asyncio/base_events.py", line 603, in run_until_complete\n self.run_forever()\n File "/usr/lib/python3.8/asyncio/base_events.py", line 570, in run_forever\n self._run_once()\n File "/usr/lib/python3.8/asyncio/base_events.py", line 1823, in _run_once\n event_list = self._selector.select(timeout)\n File "/usr/lib/python3.8/selectors.py", line 468, in select\n fd_event_list = self._selector.poll(timeout, max_ev)\n File "/usr/local/lib/python3.8/dist-packages/prefect/engine.py", line 1670, in cancel_flow_run\n raise TerminationSignal(signal=signal.SIGTERM)\nprefect.exceptions.TerminationSignal\n\nDuring handling of the above exception, another exception occurred:\n\nAssertionError\n')
jpuris
07/15/2023, 9:37 AMprefect.exceptions.TerminationSignal
a SIGTERM was received.. possibly from k8s, killing the pod (please do check the pod metadata.. i.e. is the status terminated
[if so, what is the reason] or failed
)
When the graceful shutdown was init, the code threw another exception
AssertionError
I do not know what this is..Ankit
07/15/2023, 5:56 PMjpuris
07/15/2023, 5:59 PMAnkit
07/17/2023, 3:57 AMredsquare
08/02/2023, 9:15 PMAnkit
08/03/2023, 3:02 AMjpuris
08/03/2023, 4:12 PMredsquare
08/03/2023, 4:13 PMjpuris
08/03/2023, 4:16 PMbtw, how should I distinguish aSource: https://prefect-community.slack.com/archives/C048SVCEFF0/p1690709891457919 If you can reproduce this reliably, a MRE would be super helpful 😕shutdown vs. user initiatedspot instance
in k8s? Assume I have the ASG with spot instances, when the nodes are being released, I would getcancel
in the running pods tooSIGTERM
redsquare
08/03/2023, 4:21 PMAnkit
08/03/2023, 4:37 PMredsquare
08/03/2023, 4:38 PMAnkit
08/04/2023, 2:07 PMYuriy Trofimentsev
08/08/2023, 3:12 PMredsquare
08/08/2023, 3:21 PMYuriy Trofimentsev
08/08/2023, 3:24 PMredsquare
08/08/2023, 3:24 PMYuriy Trofimentsev
08/08/2023, 4:07 PMredsquare
08/08/2023, 4:08 PMYuriy Trofimentsev
08/08/2023, 4:10 PMredsquare
08/08/2023, 4:24 PMYuriy Trofimentsev
08/08/2023, 4:38 PMredsquare
08/08/2023, 4:39 PMYuriy Trofimentsev
08/08/2023, 4:41 PM@task()
def select(sql_path: str, params: BigQueryParamsMixin) -> pd.DataFrame:
query = render(sql_path, params=params.render_params)
job_config = bq.QueryJobConfig(query_parameters=params.bq_params)
return bq_client.query(query, location='EU', job_config=job_config).to_dataframe()
def _split(params: BigQueryParamsMixin, chunks_size: int = 150_000):
input_data = getattr(params, 'input_data', pd.DataFrame())
chunks_number = len(input_data.index) // chunks_size
if chunks_number >= 1:
for chunk in np.array_split(input_data, chunks_number):
params_copy = deepcopy(params)
# the task which saves the result in GCS
input_data_path = save_result(chunk)
setattr(params_copy, 'input_data_path', input_data_path)
yield params_copy
else:
yield params
@flow(task_runner=SequentialTaskRunner())
def select_metric(sql: str, params: BigQueryParamsMixin, from_file: bool = True) -> pd.DataFrame:
df = pd.DataFrame()
for params in _split(params):
# BigQuery has limits on the size of a query.
# Therefore, it is worth splitting the input
# into chunks if it's needed.
df_2 = select(sql, params)
df = pd.concat([df, df_2])
return df
And also traceback which is displayed when the error occurs:
Crash details:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1884, in capture_sigterm
yield
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
return call.result()
^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 283, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 169, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
result = await coro
^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
return await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 681, in create_and_begin_subflow_run
terminal_state = await orchestrate_flow_run(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 830, in orchestrate_flow_run
result = await flow_call.aresult()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
return await asyncio.wrap_future(self.future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 316, in _run_sync
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/trebufect/flows/select_metric.py", line 53, in select_metric
df_2 = select(sql, params)
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/tasks.py", line 730, in submit
return enter_task_run_engine(
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1145, in enter_task_run_engine
return from_sync.wait_for_call_in_loop_thread(begin_run)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 242, in wait_for_call_in_loop_thread
waiter.wait()
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/waiters.py", line 152, in wait
self._handle_waiting_callbacks()
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/waiters.py", line 119, in _handle_waiting_callbacks
callback: Call = self._queue.get()
^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/queue.py", line 171, in get
self.not_empty.wait()
File "/usr/local/lib/python3.11/threading.py", line 320, in wait
waiter.acquire()
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1874, in cancel_flow_run
raise TerminationSignal(signal=signal.SIGTERM)
prefect.exceptions.TerminationSignal
Prefect Version: 2.11.3
Python Version: 3.11
I have one default agent.
Each subflow runs its own Kubernetes Job.redsquare
08/09/2023, 11:05 AMYuriy Trofimentsev
08/09/2023, 11:14 AMoomNot sure. I monitored the memory usage of pods and it's fine.
I changed my prefect tasks that query the db to just normal functionsYes, I'm sure that it can fix the problem if rewrite the code from tasks to basic functions. However, I'd like to use Prefect with tasks 🙂
redsquare
08/09/2023, 11:15 AMAnkit
08/09/2023, 11:15 AMredsquare
08/09/2023, 11:16 AMYuriy Trofimentsev
08/09/2023, 11:26 AMredsquare
08/09/2023, 11:28 AMYuriy Trofimentsev
08/10/2023, 1:35 PM<http://cluster-autoscaler.kubernetes.io/safe-to-evict|cluster-autoscaler.kubernetes.io/safe-to-evict>: "false"
So, piece from my deployment:
job:
apiVersion: batch/v1
kind: Job
metadata:
labels: {}
spec:
template:
metadata:
annotations:
<http://cluster-autoscaler.kubernetes.io/safe-to-evict|cluster-autoscaler.kubernetes.io/safe-to-evict>: "false"
Not sure, that it can solve your cases.. but anyway.redsquare
08/10/2023, 1:36 PMTanishq Hooda
09/22/2023, 12:13 PM/usr/local/lib/python3.9/runpy.py:127: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour warn(RuntimeWarning(msg))
Ankit
09/23/2023, 9:04 AM