Bao Pham
08/28/2025, 8:32 AMAn error occurred while monitoring flow run '068aff45-e8db-7a30-8000-3a21c44797fd'. The flow run will not be marked as failed, but an issue may have occurred.
Traceback (most recent call last):
File "/app/.venv/lib/python3.13/site-packages/prefect_gcp/models/cloud_run_v2.py", line 363, in get
response = request.execute()
File "/app/.venv/lib/python3.13/site-packages/googleapiclient/_helpers.py", line 130, in positional_wrapper
return wrapped(*args, **kwargs)
File "/app/.venv/lib/python3.13/site-packages/googleapiclient/http.py", line 923, in execute
resp, content = _retry_request(
~~~~~~~~~~~~~~^
http,
^^^^^
...<7 lines>...
headers=self.headers,
^^^^^^^^^^^^^^^^^^^^^
)
^
File "/app/.venv/lib/python3.13/site-packages/googleapiclient/http.py", line 222, in _retry_request
raise exception
File "/app/.venv/lib/python3.13/site-packages/googleapiclient/http.py", line 191, in _retry_request
resp, content = http.request(uri, method, *args, **kwargs)
~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/.venv/lib/python3.13/site-packages/google_auth_httplib2.py", line 209, in request
self.credentials.before_request(self._request, method, uri, request_headers)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/.venv/lib/python3.13/site-packages/google/auth/credentials.py", line 239, in before_request
self._blocking_refresh(request)
~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^
File "/app/.venv/lib/python3.13/site-packages/google/auth/credentials.py", line 202, in _blocking_refresh
self.refresh(request)
~~~~~~~~~~~~^^^^^^^^^
File "/app/.venv/lib/python3.13/site-packages/google/oauth2/service_account.py", line 448, in refresh
access_token, expiry, _ = _client.jwt_grant(
~~~~~~~~~~~~~~~~~^
request, self._token_uri, assertion
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
)
^
File "/app/.venv/lib/python3.13/site-packages/google/oauth2/_client.py", line 299, in jwt_grant
response_data = _token_endpoint_request(
request,
...<5 lines>...
},
)
File "/app/.venv/lib/python3.13/site-packages/google/oauth2/_client.py", line 259, in _token_endpoint_request
response_status_ok, response_data, retryable_error = _token_endpoint_request_no_throw(
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^
request,
^^^^^^^^
...<6 lines>...
**kwargs
^^^^^^^^
)
^
File "/app/.venv/lib/python3.13/site-packages/google/oauth2/_client.py", line 192, in _token_endpoint_request_no_throw
response = request(
method="POST", url=token_uri, headers=headers_to_use, body=body, **kwargs
)
File "/app/.venv/lib/python3.13/site-packages/google_auth_httplib2.py", line 119, in __call__
response, data = self.http.request(
~~~~~~~~~~~~~~~~~^
url, method=method, body=body, headers=headers, **kwargs
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
)
^
File "/app/.venv/lib/python3.13/site-packages/httplib2/__init__.py", line 1724, in request
(response, content) = self._request(
~~~~~~~~~~~~~^
conn, authority, uri, request_uri, method, body, headers, redirections, cachekey,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
)
^
File "/app/.venv/lib/python3.13/site-packages/httplib2/__init__.py", line 1444, in _request
(response, content) = self._conn_request(conn, request_uri, method, body, headers)
~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.13/ssl.py", line 1232, in send
return self._sslobj.write(data)
~~~~~~~~~~~~~~~~~~^^^^^^
BrokenPipeError: [Errno 32] Broken pipe
02:13:23 PM
Error
Encountered an exception while waiting for job run completion - [Errno 32] Broken pipe
Marvin
08/28/2025, 8:32 AMMarvin
08/28/2025, 8:37 AMgcloud run jobs executions list --region=REGION --job=JOB_NAME
gcloud run jobs executions describe EXECUTION_ID --region=REGION
2) Try a quick re-run
- If this was transient network noise, a re-run will often succeed.
3) Prefer ADC over JSON keys (if possible)
- If your worker runs on GCP (Cloud Run, GKE, GCE), attach a Service Account to the runtime and rely on Application Default Credentials (metadata server). This avoids outbound JWT token refresh calls and is much more reliable.
- If running outside GCP, consider Workload Identity Federation rather than long‑lived JSON keys.
4) Pin to a stable Python and update Google libs
- Given you’re on Python 3.13, try 3.11 or 3.12 to rule out runtime issues.
- Update these packages to current versions:
pip install -U prefect prefect-gcp google-api-python-client google-auth google-auth-httplib2 httplib2
- Then re-deploy your worker/flow image.
5) Check for proxies and timeouts
- If you’re behind a proxy/NAT, ensure `HTTPS_PROXY`/`NO_PROXY` are set appropriately, and that the proxy is not closing idle TLS connections.
- Ensure the environment has stable egress to Google APIs.
6) Add retries at the orchestration layer
- As a safety net, add a retry policy to your flow so a monitoring hiccup retries the run.
Docs
- GCP worker guide and best practices: Prefect GCP Worker Guide
A couple of questions to help us zero in:
- Which versions of Prefect and prefect-gcp are you on?
- Where is your worker running (Cloud Run, GKE, local, other)?
- Are you using a JSON key file or ADC/metadata-based credentials?
- How often do you see this error, and does a re-run usually work?
If you share those details, I can suggest a more targeted fix (and if needed, we can look into a tweak to the polling/retry behavior for Cloud Run v2 monitoring).Bao Pham
08/28/2025, 8:45 AMMarvin
08/28/2025, 8:46 AMMarvin
08/28/2025, 8:49 AMpip install -U prefect prefect-gcp google-api-python-client google-auth google-auth-httplib2 httplib2
If feasible, try Python 3.11 or 3.12.
Suppress the log in Prefect UI
- You can filter out just this message with a custom logging config so it won’t clutter your Prefect logs. Create a small filter and a logging config, then point Prefect at it with PREFECT_LOGGING_SETTINGS_PATH
.
filters.py:
import logging
class IgnoreBrokenPipe(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
msg = (record.getMessage() or "")
return (
"BrokenPipeError: [Errno 32] Broken pipe" not in msg
and "Encountered an exception while waiting for job run completion" not in msg
)
logging.yaml:
version: 1
disable_existing_loggers: False
filters:
ignore_broken_pipe:
(): yourpkg.filters.IgnoreBrokenPipe
formatters:
default:
format: "%(asctime)s | %(levelname)s | %(name)s | %(message)s"
handlers:
console:
class: logging.StreamHandler
level: INFO
formatter: default
filters: [ignore_broken_pipe]
# Apply the filter to Prefect logs (and optionally the GCP integration logger if present)
loggers:
prefect:
level: INFO
handlers: [console]
propagate: False
prefect_gcp:
level: INFO
handlers: [console]
propagate: False
filters: [ignore_broken_pipe]
Then set:
export PREFECT_LOGGING_SETTINGS_PATH=/path/to/logging.yaml
Hardening (if it’s frequent)
- Switch to ADC (metadata server) on GCP runtimes to avoid external token refresh calls entirely.
- Confirm you’re not behind a proxy or NAT that closes idle TLS connections; if you are, ensure keep-alives/timeouts are configured to avoid BrokenPipe on long polls.
Docs
- GCP worker guide: Prefect GCP Worker Guide
- Logging customization: Prefect 3 logging customization
If you want, I can open an issue to improve the monitoring path so this is retried and logged at WARNING instead of ERROR. If so, could you share your versions of Prefect, prefect-gcp, and Python?Bao Pham
08/28/2025, 8:55 AMMarvin
08/28/2025, 8:55 AMMarvin
08/28/2025, 8:58 AMPREFECT_API_URL
and PREFECT_API_KEY
must be present inside the Cloud Run job container. If you customized your work pool’s base job template, make sure these aren’t accidentally removed.
- Confirm outbound egress to the Prefect API:
- If you use a VPC connector/private egress, allow outbound HTTPS to api.prefect.cloud
(443). Otherwise, the engine can’t post state/logs when the network blips occur.
- Look in your Cloud Run job logs for messages like “Failed to reach Prefect API” or repeated HTTP retry warnings.
- Set explicit request timeouts/retries for the Prefect client (optional but can help with brief blips):
- Add these env vars to the job:
PREFECT_API_REQUESTS_TIMEOUT=30
PREFECT_API_REQUESTS_MAX_RETRIES=5
- Keep dependencies current and avoid Python 3.13 for now:
pip install -U prefect prefect-gcp google-api-python-client google-auth google-auth-httplib2 httplib2
If you can, run the job on Python 3.11 or 3.12.
Reduce/avoid the monitoring crash
- Prefer ADC over JSON keys: Attach a service account to the runtime and rely on GCP metadata-based credentials so token refresh doesn’t hit the external token endpoint (where the BrokenPipe happens).
- If you must use JSON keys, make sure network egress is stable for Google OAuth token refresh. Proxies/NATs that close idle TLS connections are common culprits.
About the worker stopping “listening”
- Right now, a BrokenPipe in the Cloud Run v2 polling path aborts monitoring. That’s why you stop seeing infra-log streaming. The flow should still report states on its own if the container can reach the Prefect API.
- I’d like to get this improved to retry and keep polling rather than aborting. If you can share:
- Prefect version, prefect-gcp version, Python version
- Where the worker runs and where the job runs
- Whether you’re using ADC or a JSON key
- Whether you use a proxy/VPC connector
I can open a GitHub issue to add retries around the Cloud Run v2 monitor and downgrade this to a warning when recoverable.
Reference
- GCP worker guide: Prefect GCP Worker Guide
If you’d like, I can also help review your work pool’s base job template to ensure the Prefect API env vars are passed into the Cloud Run job.