<@ULVA73B9P> how should I deal with this Broken Pi...
# ask-marvin
b
@Marvin how should I deal with this Broken Pipe error??
Copy code
An error occurred while monitoring flow run '068aff45-e8db-7a30-8000-3a21c44797fd'. The flow run will not be marked as failed, but an issue may have occurred.
Traceback (most recent call last):
  File "/app/.venv/lib/python3.13/site-packages/prefect_gcp/models/cloud_run_v2.py", line 363, in get
    response = request.execute()
  File "/app/.venv/lib/python3.13/site-packages/googleapiclient/_helpers.py", line 130, in positional_wrapper
    return wrapped(*args, **kwargs)
  File "/app/.venv/lib/python3.13/site-packages/googleapiclient/http.py", line 923, in execute
    resp, content = _retry_request(
                    ~~~~~~~~~~~~~~^
        http,
        ^^^^^
    ...<7 lines>...
        headers=self.headers,
        ^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "/app/.venv/lib/python3.13/site-packages/googleapiclient/http.py", line 222, in _retry_request
    raise exception
  File "/app/.venv/lib/python3.13/site-packages/googleapiclient/http.py", line 191, in _retry_request
    resp, content = http.request(uri, method, *args, **kwargs)
                    ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/.venv/lib/python3.13/site-packages/google_auth_httplib2.py", line 209, in request
    self.credentials.before_request(self._request, method, uri, request_headers)
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/.venv/lib/python3.13/site-packages/google/auth/credentials.py", line 239, in before_request
    self._blocking_refresh(request)
    ~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^
  File "/app/.venv/lib/python3.13/site-packages/google/auth/credentials.py", line 202, in _blocking_refresh
    self.refresh(request)
    ~~~~~~~~~~~~^^^^^^^^^
  File "/app/.venv/lib/python3.13/site-packages/google/oauth2/service_account.py", line 448, in refresh
    access_token, expiry, _ = _client.jwt_grant(
                              ~~~~~~~~~~~~~~~~~^
        request, self._token_uri, assertion
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "/app/.venv/lib/python3.13/site-packages/google/oauth2/_client.py", line 299, in jwt_grant
    response_data = _token_endpoint_request(
        request,
    ...<5 lines>...
        },
    )
  File "/app/.venv/lib/python3.13/site-packages/google/oauth2/_client.py", line 259, in _token_endpoint_request
    response_status_ok, response_data, retryable_error = _token_endpoint_request_no_throw(
                                                         ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^
        request,
        ^^^^^^^^
    ...<6 lines>...
        **kwargs
        ^^^^^^^^
    )
    ^
  File "/app/.venv/lib/python3.13/site-packages/google/oauth2/_client.py", line 192, in _token_endpoint_request_no_throw
    response = request(
        method="POST", url=token_uri, headers=headers_to_use, body=body, **kwargs
    )
  File "/app/.venv/lib/python3.13/site-packages/google_auth_httplib2.py", line 119, in __call__
    response, data = self.http.request(
                     ~~~~~~~~~~~~~~~~~^
        url, method=method, body=body, headers=headers, **kwargs
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "/app/.venv/lib/python3.13/site-packages/httplib2/__init__.py", line 1724, in request
    (response, content) = self._request(
                          ~~~~~~~~~~~~~^
        conn, authority, uri, request_uri, method, body, headers, redirections, cachekey,
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "/app/.venv/lib/python3.13/site-packages/httplib2/__init__.py", line 1444, in _request
    (response, content) = self._conn_request(conn, request_uri, method, body, headers)
                          ~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.13/ssl.py", line 1232, in send
    return self._sslobj.write(data)
           ~~~~~~~~~~~~~~~~~~^^^^^^
BrokenPipeError: [Errno 32] Broken pipe
02:13:23 PM
Error
Encountered an exception while waiting for job run completion - [Errno 32] Broken pipe
m
thought for 303.5 seconds
Thanks for sharing the full traceback — that helps. Here’s what’s going on and how to tackle it. What the error means - The error is thrown while the Prefect GCP integration is polling Cloud Run v2 for your job’s status. The stack shows a failure during Google’s OAuth token refresh (via google-auth + httplib2) with a BrokenPipe at the SSL layer. Prefect warns that monitoring failed but does not mark your flow run failed. Most likely causes - Transient network hiccup while refreshing the access token - Use of service-account JSON keys (token refresh hits Google’s external token endpoint) vs. ADC/metadata server - Library/runtime compatibility (you’re on Python 3.13 per the traceback; some Google libs + httplib2 have been flaky on newer Python versions) - Proxies/middleboxes closing idle or long-lived connections What to do next 1) Check the actual Cloud Run job outcome - The monitoring failed, but your job may have completed fine. Verify in the Cloud Console or with gcloud:
Copy code
gcloud run jobs executions list --region=REGION --job=JOB_NAME
     gcloud run jobs executions describe EXECUTION_ID --region=REGION
2) Try a quick re-run - If this was transient network noise, a re-run will often succeed. 3) Prefer ADC over JSON keys (if possible) - If your worker runs on GCP (Cloud Run, GKE, GCE), attach a Service Account to the runtime and rely on Application Default Credentials (metadata server). This avoids outbound JWT token refresh calls and is much more reliable. - If running outside GCP, consider Workload Identity Federation rather than long‑lived JSON keys. 4) Pin to a stable Python and update Google libs - Given you’re on Python 3.13, try 3.11 or 3.12 to rule out runtime issues. - Update these packages to current versions:
Copy code
pip install -U prefect prefect-gcp google-api-python-client google-auth google-auth-httplib2 httplib2
- Then re-deploy your worker/flow image. 5) Check for proxies and timeouts - If you’re behind a proxy/NAT, ensure `HTTPS_PROXY`/`NO_PROXY` are set appropriately, and that the proxy is not closing idle TLS connections. - Ensure the environment has stable egress to Google APIs. 6) Add retries at the orchestration layer - As a safety net, add a retry policy to your flow so a monitoring hiccup retries the run. Docs - GCP worker guide and best practices: Prefect GCP Worker Guide A couple of questions to help us zero in: - Which versions of Prefect and prefect-gcp are you on? - Where is your worker running (Cloud Run, GKE, local, other)? - Are you using a JSON key file or ADC/metadata-based credentials? - How often do you see this error, and does a re-run usually work? If you share those details, I can suggest a more targeted fix (and if needed, we can look into a tweak to the polling/retry behavior for Cloud Run v2 monitoring).
b
@Marvin When this happens, the Cloud Run Job still works fine. This error on appear on Prefect log UI only.
m
thought for 177.4 seconds
Got it — thanks for confirming the Cloud Run Job completes fine. In this case, the error is coming from the monitoring/polling step (token refresh via google-auth/httplib2) and is cosmetic: Prefect logs it, but doesn’t mark your flow run failed. You’ve got a few options: Pragmatic mitigation (least invasive) - Treat it as noise if it’s rare. The job result is correct and Prefect explicitly avoids failing the run. - If you want to reduce the chance of it occurring: - Prefer Application Default Credentials (metadata-based) over JSON key files where possible. - Update dependencies and, if you can, avoid Python 3.13 for now:
Copy code
pip install -U prefect prefect-gcp google-api-python-client google-auth google-auth-httplib2 httplib2
If feasible, try Python 3.11 or 3.12. Suppress the log in Prefect UI - You can filter out just this message with a custom logging config so it won’t clutter your Prefect logs. Create a small filter and a logging config, then point Prefect at it with
PREFECT_LOGGING_SETTINGS_PATH
. filters.py:
Copy code
import logging

class IgnoreBrokenPipe(logging.Filter):
    def filter(self, record: logging.LogRecord) -> bool:
        msg = (record.getMessage() or "")
        return (
            "BrokenPipeError: [Errno 32] Broken pipe" not in msg
            and "Encountered an exception while waiting for job run completion" not in msg
        )
logging.yaml:
Copy code
version: 1
disable_existing_loggers: False

filters:
  ignore_broken_pipe:
    (): yourpkg.filters.IgnoreBrokenPipe

formatters:
  default:
    format: "%(asctime)s | %(levelname)s | %(name)s | %(message)s"

handlers:
  console:
    class: logging.StreamHandler
    level: INFO
    formatter: default
    filters: [ignore_broken_pipe]

# Apply the filter to Prefect logs (and optionally the GCP integration logger if present)
loggers:
  prefect:
    level: INFO
    handlers: [console]
    propagate: False
  prefect_gcp:
    level: INFO
    handlers: [console]
    propagate: False
    filters: [ignore_broken_pipe]
Then set:
Copy code
export PREFECT_LOGGING_SETTINGS_PATH=/path/to/logging.yaml
Hardening (if it’s frequent) - Switch to ADC (metadata server) on GCP runtimes to avoid external token refresh calls entirely. - Confirm you’re not behind a proxy or NAT that closes idle TLS connections; if you are, ensure keep-alives/timeouts are configured to avoid BrokenPipe on long polls. Docs - GCP worker guide: Prefect GCP Worker Guide - Logging customization: Prefect 3 logging customization If you want, I can open an issue to improve the monitoring path so this is retried and logged at WARNING instead of ERROR. If so, could you share your versions of Prefect, prefect-gcp, and Python?
b
@Marvin but Prefect will stop listening to the Cloud Run and stops updating the flow/task status
m
thought for 195.4 seconds
Thanks for the clarification — that helps. Two things are happening at once: 1) The worker’s Cloud Run monitor crashes on a transient Google API call (BrokenPipeError) and stops polling/streaming logs. 2) Your UI also stops getting state/log updates from the flow itself. In Prefect 3, those are separate paths: - Worker monitoring: Submits the Cloud Run Job and polls it (also streams infra logs). - Flow engine (inside your Cloud Run container): Reports task/flow states and logs directly to the Prefect API. So if the UI stops updating states when the monitoring fails, it usually means your flow container cannot (or no longer can) reach the Prefect API. The Cloud Run job keeps running, but the engine can’t report progress. What I recommend Short-term checks to keep UI updates flowing - Verify the flow container has Prefect API env vars: -
PREFECT_API_URL
and
PREFECT_API_KEY
must be present inside the Cloud Run job container. If you customized your work pool’s base job template, make sure these aren’t accidentally removed. - Confirm outbound egress to the Prefect API: - If you use a VPC connector/private egress, allow outbound HTTPS to
api.prefect.cloud
(443). Otherwise, the engine can’t post state/logs when the network blips occur. - Look in your Cloud Run job logs for messages like “Failed to reach Prefect API” or repeated HTTP retry warnings. - Set explicit request timeouts/retries for the Prefect client (optional but can help with brief blips): - Add these env vars to the job:
Copy code
PREFECT_API_REQUESTS_TIMEOUT=30
    PREFECT_API_REQUESTS_MAX_RETRIES=5
- Keep dependencies current and avoid Python 3.13 for now:
Copy code
pip install -U prefect prefect-gcp google-api-python-client google-auth google-auth-httplib2 httplib2
If you can, run the job on Python 3.11 or 3.12. Reduce/avoid the monitoring crash - Prefer ADC over JSON keys: Attach a service account to the runtime and rely on GCP metadata-based credentials so token refresh doesn’t hit the external token endpoint (where the BrokenPipe happens). - If you must use JSON keys, make sure network egress is stable for Google OAuth token refresh. Proxies/NATs that close idle TLS connections are common culprits. About the worker stopping “listening” - Right now, a BrokenPipe in the Cloud Run v2 polling path aborts monitoring. That’s why you stop seeing infra-log streaming. The flow should still report states on its own if the container can reach the Prefect API. - I’d like to get this improved to retry and keep polling rather than aborting. If you can share: - Prefect version, prefect-gcp version, Python version - Where the worker runs and where the job runs - Whether you’re using ADC or a JSON key - Whether you use a proxy/VPC connector I can open a GitHub issue to add retries around the Cloud Run v2 monitor and downgrade this to a warning when recoverable. Reference - GCP worker guide: Prefect GCP Worker Guide If you’d like, I can also help review your work pool’s base job template to ensure the Prefect API env vars are passed into the Cloud Run job.