Hello everyone ! We have been using prefect in pro...
# ask-community
r
Hello everyone ! We have been using prefect in production for a while now and very happy. But we have a very big issue that we have no idea how to fix :
Copy code
Encountered an exception while waiting for job run completion - [Errno 32] Broken pipe
08:26:17 AM
prefect.flow_runs.worker
An error occurred while monitoring flow run 'bdc14027-a02c-4b68-af6b-cb026e0141ca'. The flow run will not be marked as failed, but an issue may have occurred.
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/dist-packages/prefect/workers/base.py", line 896, in _submit_run_and_capture_errors
    result = await self.run(
  File "/usr/local/lib/python3.9/dist-packages/prefect_gcp/workers/cloud_run_v2.py", line 423, in run
    result = await run_sync_in_worker_thread(
  File "/usr/local/lib/python3.9/dist-packages/prefect/utilities/asyncutils.py", line 95, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(
  File "/usr/local/lib/python3.9/dist-packages/anyio/to_thread.py", line 33, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.9/dist-packages/anyio/_backends/_asyncio.py", line 877, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.9/dist-packages/anyio/_backends/_asyncio.py", line 807, in run
    result = context.run(func, *args)
  File "/usr/local/lib/python3.9/dist-packages/prefect_gcp/workers/cloud_run_v2.py", line 694, in _watch_job_execution_and_get_result
    execution = self._watch_job_execution(
  File "/usr/local/lib/python3.9/dist-packages/prefect_gcp/workers/cloud_run_v2.py", line 768, in _watch_job_execution
    execution = ExecutionV2.get(
  File "/usr/local/lib/python3.9/dist-packages/prefect_gcp/models/cloud_run_v2.py", line 360, in get
    response = request.execute()
  File "/usr/local/lib/python3.9/dist-packages/googleapiclient/_helpers.py", line 130, in positional_wrapper
    return wrapped(*args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/googleapiclient/http.py", line 923, in execute
    resp, content = _retry_request(
  File "/usr/local/lib/python3.9/dist-packages/googleapiclient/http.py", line 222, in _retry_request
    raise exception
  File "/usr/local/lib/python3.9/dist-packages/googleapiclient/http.py", line 191, in _retry_request
    resp, content = http.request(uri, method, *args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/google_auth_httplib2.py", line 209, in request
    self.credentials.before_request(self._request, method, uri, request_headers)
  File "/usr/local/lib/python3.9/dist-packages/google/auth/credentials.py", line 228, in before_request
    self._blocking_refresh(request)
  File "/usr/local/lib/python3.9/dist-packages/google/auth/credentials.py", line 191, in _blocking_refresh
    self.refresh(request)
  File "/usr/local/lib/python3.9/dist-packages/google/auth/compute_engine/credentials.py", line 128, in refresh
    self._retrieve_info(request)
  File "/usr/local/lib/python3.9/dist-packages/google/auth/compute_engine/credentials.py", line 101, in _retrieve_info
    info = _metadata.get_service_account_info(
  File "/usr/local/lib/python3.9/dist-packages/google/auth/compute_engine/_metadata.py", line 323, in get_service_account_info
    return get(request, path, params={"recursive": "true"})
  File "/usr/local/lib/python3.9/dist-packages/google/auth/compute_engine/_metadata.py", line 204, in get
    response = request(url=url, method="GET", headers=headers_to_use)
  File "/usr/local/lib/python3.9/dist-packages/google_auth_httplib2.py", line 119, in __call__
    response, data = self.http.request(
  File "/usr/local/lib/python3.9/dist-packages/httplib2/__init__.py", line 1724, in request
    (response, content) = self._request(
  File "/usr/local/lib/python3.9/dist-packages/httplib2/__init__.py", line 1444, in _request
    (response, content) = self._conn_request(conn, request_uri, method, body, headers)
  File "/usr/local/lib/python3.9/dist-packages/httplib2/__init__.py", line 1367, in _conn_request
    conn.request(method, request_uri, body, headers)
  File "/usr/lib/python3.9/http/client.py", line 1255, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/usr/lib/python3.9/http/client.py", line 1301, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/usr/lib/python3.9/http/client.py", line 1250, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/usr/lib/python3.9/http/client.py", line 1010, in _send_output
    self.send(msg)
  File "/usr/lib/python3.9/http/client.py", line 971, in send
    self.sock.sendall(data)
BrokenPipeError: [Errno 32] Broken pipe
This happens for flows that runs for more than 1h30 but not every time ... Any input on this would be much appreciated 🙂
c
Hi Robin, sorry you're running into this. Judging by the stack trace, this is a bit outside of Prefect. Prefect uses the open-source
googleapiclient
and its underlying
google.*
packages. It appears to me from the stack trace that Google's API client needs to get a refresh token from GCP, and since you're running on an instance inside GCP (I think?), it's trying to call the local GCP metadata server (like the one that's available to each machine at http://metadata.google.internal , as described here: https://cloud.google.com/compute/docs/metadata/querying-metadata), and that HTTP request is timing out or is closed before it responds. I'm wondering if they have any open bug reports about this on their repos?
k
Robin, I've seen something like this before. Are you using the hybrid cloud run v2 work pool with a worker running on compute in your gcp project, like as a cloud run service, on a compute engine vm, or in GKE? if so, do you have a GCP credentials block assigned on your work pool's config? I guess the stack trace implies the answer to the first question
r
Hello ! Thanks both for your answers. Indeed as you guessed, I’m running my server and worker inside a compute engine VM instance. We are running flows using the cloud run V2 worker. The weird thing is that it’s working fine most of the time but for some long running flows it happens. I will look a bit into the metadata server and make an issue once i’ve better identified the problem