https://prefect.io logo
a

Ankit

07/13/2023, 1:51 PM
Hi, We are intermittently getting this error in some of our prefect flows running on 2.8.4 and tried increasing resources as well but nothing works. If someone has faced this earlier, please help. Setup is running on kubernetes.
Copy code
AssertionError

Traceback (most recent call last):

  File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)

  File "/usr/lib/python3.8/asyncio/base_events.py", line 603, in run_until_complete
    self.run_forever()

  File "/usr/lib/python3.8/asyncio/base_events.py", line 570, in run_forever
    self._run_once()

  File "/usr/lib/python3.8/asyncio/base_events.py", line 1823, in _run_once
    event_list = self._selector.select(timeout)

  File "/usr/lib/python3.8/selectors.py", line 468, in select
    fd_event_list = self._selector.poll(timeout, max_ev)

  File "/usr/local/lib/python3.8/dist-packages/prefect/engine.py", line 1670, in cancel_flow_run
    raise TerminationSignal(signal=signal.SIGTERM)

prefect.exceptions.TerminationSignal


During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/usr/local/lib/python3.8/dist-packages/prefect/engine.py", line 1533, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)

  File "/usr/local/lib/python3.8/dist-packages/prefect/utilities/asyncutils.py", line 165, in run_sync_in_interruptible_worker_thread
    assert result is not NotSet

AssertionError

Finished in state Failed('Task run encountered an exception: Traceback (most recent call last):\n  File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run\n    return loop.run_until_complete(main)\n  File "/usr/lib/python3.8/asyncio/base_events.py", line 603, in run_until_complete\n    self.run_forever()\n  File "/usr/lib/python3.8/asyncio/base_events.py", line 570, in run_forever\n    self._run_once()\n  File "/usr/lib/python3.8/asyncio/base_events.py", line 1823, in _run_once\n    event_list = self._selector.select(timeout)\n  File "/usr/lib/python3.8/selectors.py", line 468, in select\n    fd_event_list = self._selector.poll(timeout, max_ev)\n  File "/usr/local/lib/python3.8/dist-packages/prefect/engine.py", line 1670, in cancel_flow_run\n    raise TerminationSignal(signal=signal.SIGTERM)\nprefect.exceptions.TerminationSignal\n\nDuring handling of the above exception, another exception occurred:\n\nAssertionError\n')
Hi, can someone please help with this? Thanks.
j

jpuris

07/15/2023, 9:37 AM
Your pod is getting killed by k8s. Please see kubernetes pod metadata for the reason this is happening. edit: most likely out of memory or getting evicted 🤷
Actually looking at this again, I do not think this is k8s doing this at all.. it seems prefect is doing graceful shutdown (SIGTERM) due to an exception in cancelling the flow run..
I suspect these are not full logs. It would be interesting to see why the cancellation was emitted in the first place.
Anyhow.. from what I can tell in the logs is that
Copy code
prefect.exceptions.TerminationSignal
a SIGTERM was received.. possibly from k8s, killing the pod (please do check the pod metadata.. i.e. is the status
terminated
[if so, what is the reason] or
failed
) When the graceful shutdown was init, the code threw another exception
Copy code
AssertionError
I do not know what this is..
🙏 1
a

Ankit

07/15/2023, 5:56 PM
Hi @jpuris this is the full logs. I did check all pod metrics and both memory and cpu are over provisioned by 5x of what the flow actually needs. And only 2 out of 40 flows are facing this issue intermittently. At times flow status ends in Success and goes unnoticed that it actually failed.
j

jpuris

07/15/2023, 5:59 PM
What does kubernetes tell you about this pod. Was it terminated or did it exit with bad status i.e. runtime error
a

Ankit

07/17/2023, 3:57 AM
pod was terminated normally, all pod metrics look normal as well
r

redsquare

08/02/2023, 9:15 PM
Hey @Ankit I am seeing the exact same intermittent issues in k8s, have you resolved?
a

Ankit

08/03/2023, 3:02 AM
Hey @redsquare, no it's still happening quite frequently
j

jpuris

08/03/2023, 4:12 PM
Kubernetes is sending a SIGTERM to the pod, which is a graceful shutdown. When such signal has been received, prefect emits following exception TerminationSignal. Unless I'm terribly wrong here, the answer is in kubernetes. You'd need to find out why k8s scales down the pod..
👍 1
r

redsquare

08/03/2023, 4:13 PM
yeah I cannot see any reason - all pod stats look healthy in datadog
1
j

jpuris

08/03/2023, 4:16 PM
Unfortunately I'm not familiar with datadog 🤷 There is no termination reason for the pod? You'd be able to see this in kubectl. Alternative theory is that the flow was cancelled (it may be already fixed).
btw, how should I distinguish a
spot instance
shutdown vs. user initiated
cancel
in k8s? Assume I have the ASG with spot instances, when the nodes are being released, I would get
SIGTERM
in the running pods too
Source: https://prefect-community.slack.com/archives/C048SVCEFF0/p1690709891457919 If you can reproduce this reliably, a MRE would be super helpful 😕
r

redsquare

08/03/2023, 4:21 PM
I'll check the pod termination reason the next time it happens - I ttl them pretty quickly afterwards
I can reproduce this daily btw - long running flow, 3k tasks - randomly fails between 10 mins -> 2 hours and everything in between
image.png
no other pods on the node or in the cluster get terminated around the same time
a

Ankit

08/03/2023, 4:37 PM
I have tried change node types as well, there is no pattern.
r

redsquare

08/03/2023, 4:38 PM
we are on eks
has to be something with the prefect event loop
1
@jpuris I may have something - just got another flow to fail but the flow & agent logs are interesting, the flow appears to start again - notice the downloading flow code
the agent meanwhile reported 2023-08-04T081512.362509340Z 081338.857 | INFO | Flow run 'meek-limpet' - Created task run 'update_rooftop_metrics-3407' for task 'update_rooftop_metrics' 2023-08-04T081512.362511460Z 081338.859 | INFO | Flow run 'meek-limpet' - Executing 'update_rooftop_metrics-3407' immediately... 2023-08-04T081512.362513900Z rpc error: code = NotFound desc = an error occurred when try to find container "f782a10e7abe00b4484840abd243bd0eabdcfc5dcf0e7a242812035ffc31c4e1": not found Fri, Aug 4 2023 91512 am +pip install s3fs==2023.1.0 pydantic==1.10.6 datadog python-dotenv r7insight_python python-dotenv prefect-kv r7insight_python datadog psycopg2-binary clickhouse-connect
I assume the rpc error is k8s logging but cant be 100% - I cannot find that container id anywhere
image.png
I have narrowed this down - I have created a dedicated agent in a container in the same pod for this flow and it has run twice with no issues
@Ankit maybe something for you to try
a

Ankit

08/04/2023, 2:07 PM
Thanks @redsquare, I will also try this out
y

Yuriy Trofimentsev

08/08/2023, 3:12 PM
Hi! My flows also experience this issue. Have you figured out the problem? Today I updated the version of Prefect to 2.11.3, but the problem is still there.
r

redsquare

08/08/2023, 3:21 PM
nope - still trying to find the issue..!
🥲 1
y

Yuriy Trofimentsev

08/08/2023, 3:24 PM
I tried to reduce the number of concurrency limit in work pool and my flows finished successfully(but really long ofk), so, most likely the problem indeed is connected with concurrency issues
r

redsquare

08/08/2023, 3:24 PM
I am using the sequential runner
y

Yuriy Trofimentsev

08/08/2023, 4:07 PM
@redsquare how do you run your flow? Do you have subflows or just sequential tasks inside?
r

redsquare

08/08/2023, 4:08 PM
purely sequential tasks - 4k of them that just do a few http calls
y

Yuriy Trofimentsev

08/08/2023, 4:10 PM
And on visualization panel you see that they are running one by one?
r

redsquare

08/08/2023, 4:24 PM
that doesnt draw correctly for me, the logs suggest they
found one that does
y

Yuriy Trofimentsev

08/08/2023, 4:38 PM
Do you use the results from the previous task in the next one?
r

redsquare

08/08/2023, 4:39 PM
nope
basically iterate a dataframe and call an api for each row
as simple as it gets
or so you would hope
y

Yuriy Trofimentsev

08/08/2023, 4:41 PM
Got it, thanks for answers! I will be back if I find something..
👍 1
This error happens to me from time to time too. I have a flow that launches many other subflows. Some of these subflows are falling. It is noteworthy that they always crash if run together, but if run separately, no errors occur. Most often, the flow with this code falls. I'll share the code in the thread, maybe it will help in finding the error.
Copy code
@task()
def select(sql_path: str, params: BigQueryParamsMixin) -> pd.DataFrame:
    query = render(sql_path, params=params.render_params)
    job_config = bq.QueryJobConfig(query_parameters=params.bq_params)

    return bq_client.query(query, location='EU', job_config=job_config).to_dataframe()


def _split(params: BigQueryParamsMixin, chunks_size: int = 150_000):
    input_data = getattr(params, 'input_data', pd.DataFrame())
    chunks_number = len(input_data.index) // chunks_size

    if chunks_number >= 1:
        for chunk in np.array_split(input_data, chunks_number):
            params_copy = deepcopy(params)
            # the task which saves the result in GCS
            input_data_path = save_result(chunk) 
            setattr(params_copy, 'input_data_path', input_data_path)
            yield params_copy
    else:
        yield params


@flow(task_runner=SequentialTaskRunner())
def select_metric(sql: str, params: BigQueryParamsMixin, from_file: bool = True) -> pd.DataFrame:
    df = pd.DataFrame()

    for params in _split(params):
        #  BigQuery has limits on the size of a query. 
        #  Therefore, it is worth splitting the input 
        #  into chunks if it's needed.
        df_2 = select(sql, params)
        df = pd.concat([df, df_2])

    return df
And also traceback which is displayed when the error occurs:
Copy code
Crash details:
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1884, in capture_sigterm
    yield
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 283, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 169, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
    return await fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 681, in create_and_begin_subflow_run
    terminal_state = await orchestrate_flow_run(
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 830, in orchestrate_flow_run
    result = await flow_call.aresult()
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 316, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/trebufect/flows/select_metric.py", line 53, in select_metric
    df_2 = select(sql, params)
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/tasks.py", line 730, in submit
    return enter_task_run_engine(
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1145, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 242, in wait_for_call_in_loop_thread
    waiter.wait()
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/waiters.py", line 152, in wait
    self._handle_waiting_callbacks()
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/waiters.py", line 119, in _handle_waiting_callbacks
    callback: Call = self._queue.get()
                     ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/queue.py", line 171, in get
    self.not_empty.wait()
  File "/usr/local/lib/python3.11/threading.py", line 320, in wait
    waiter.acquire()
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1874, in cancel_flow_run
    raise TerminationSignal(signal=signal.SIGTERM)
prefect.exceptions.TerminationSignal
Prefect Version:
2.11.3
Python Version:
3.11
I have one default agent. Each subflow runs its own Kubernetes Job.
Could someone take a look and help us please? 🙏
r

redsquare

08/09/2023, 11:05 AM
@Yuriy Trofimentsev oom?
@Yuriy Trofimentsev I have fixed my issue btw - I changed my prefect tasks that query the db to just normal functions
y

Yuriy Trofimentsev

08/09/2023, 11:14 AM
@redsquare
oom
Not sure. I monitored the memory usage of pods and it's fine.
I changed my prefect tasks that query the db to just normal functions
Yes, I'm sure that it can fix the problem if rewrite the code from tasks to basic functions. However, I'd like to use Prefect with tasks 🙂
r

redsquare

08/09/2023, 11:15 AM
yeah - I just needed mine to run to completion!
wasted 4 days trying to work it out
a

Ankit

08/09/2023, 11:15 AM
I instead used Sequential Task Runner, it still fails but not that frequently.
1
r

redsquare

08/09/2023, 11:16 AM
yeah I already have that
@Yuriy Trofimentsev are yours long lasting jobs?
y

Yuriy Trofimentsev

08/09/2023, 11:26 AM
Generally 1-40 mins it depends of data size. However, the flow can fall both after 1.5 minutes and after 30. Also, the size of the input data does not affect whether the flow will fall or not. Usually from a set of 12 flows, it falls about 3-4, but unsuccessful flows are usually different.
r

redsquare

08/09/2023, 11:28 AM
yeah, there is nothing to go off due to the randomness
y

Yuriy Trofimentsev

08/10/2023, 1:35 PM
In my case the problem was in cluster-autoscaller. I set the next annotation to prevent this behavior:
<http://cluster-autoscaler.kubernetes.io/safe-to-evict|cluster-autoscaler.kubernetes.io/safe-to-evict>: "false"
So, piece from my deployment:
Copy code
job:
    apiVersion: batch/v1
    kind: Job
    metadata:
      labels: {}
    spec:
      template:
        metadata:
          annotations:
            <http://cluster-autoscaler.kubernetes.io/safe-to-evict|cluster-autoscaler.kubernetes.io/safe-to-evict>: "false"
Not sure, that it can solve your cases.. but anyway.
r

redsquare

08/10/2023, 1:36 PM
interesting - but I would expect this to show in the logs somewhere
t

Tanishq Hooda

09/22/2023, 12:13 PM
Even I'm facing this issue. Were you able to resolve it ? @Yuriy Trofimentsev @redsquare @Ankit? Even my tasks are long (2-3 hours) Prefect Version : 2.10.13 Apart from Termination error, I also have a lot of these in my logs (Prefect community says it can be ignored), wondering if they are related. Anyway I've attached the complete traceback.
Copy code
/usr/local/lib/python3.9/runpy.py:127: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour warn(RuntimeWarning(msg))
a

Ankit

09/23/2023, 9:04 AM
No, still facing this. We are moving out of prefect for critical flows and exploring other options.
9 Views