Hi! We have been getting this Broken Pipe error on...
# ask-community
b
Hi! We have been getting this Broken Pipe error on Prefect UI after a flow is running for more than 1.5 hours. However, our Cloud Run Job for that flow is still running in GCP. How should we approach this? If we retry the flow, it would redo all the work. Thanks!
Copy code
An error occurred while monitoring flow run '068aff45-e8db-7a30-8000-3a21c44797fd'. The flow run will not be marked as failed, but an issue may have occurred.
Traceback (most recent call last):
  File "/app/.venv/lib/python3.13/site-packages/prefect/workers/base.py", line 1263, in _submit_run_and_capture_errors
    result = await self.run(
             ^^^^^^^^^^^^^^^
    ...<3 lines>...
    )
    ^
  File "/app/.venv/lib/python3.13/site-packages/prefect_gcp/workers/cloud_run_v2.py", line 660, in run
    result = await run_sync_in_worker_thread(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    ...<5 lines>...
    )
    ^
  File "/app/.venv/lib/python3.13/site-packages/prefect/utilities/asyncutils.py", line 233, in run_sync_in_worker_thread
    result = await anyio.to_thread.run_sync(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        call_with_mark, call, abandon_on_cancel=True, limiter=get_thread_limiter()
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "/app/.venv/lib/python3.13/site-packages/anyio/to_thread.py", line 56, in run_sync
    return await get_async_backend().run_sync_in_worker_thread(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        func, args, abandon_on_cancel=abandon_on_cancel, limiter=limiter
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "/app/.venv/lib/python3.13/site-packages/anyio/_backends/_asyncio.py", line 2476, in run_sync_in_worker_thread
    return await future
           ^^^^^^^^^^^^
  File "/app/.venv/lib/python3.13/site-packages/anyio/_backends/_asyncio.py", line 967, in run
    result = context.run(func, *args)
  File "/app/.venv/lib/python3.13/site-packages/prefect/utilities/asyncutils.py", line 243, in call_with_mark
    return call()
  File "/app/.venv/lib/python3.13/site-packages/prefect_gcp/workers/cloud_run_v2.py", line 891, in _watch_job_execution_and_get_result
    execution = self._watch_job_execution(
        cr_client=cr_client,
    ...<2 lines>...
        poll_interval=poll_interval,
    )
  File "/app/.venv/lib/python3.13/site-packages/prefect_gcp/workers/cloud_run_v2.py", line 962, in _watch_job_execution
    execution = ExecutionV2.get(
        cr_client=cr_client,
        execution_id=execution.name,
    )
  File "/app/.venv/lib/python3.13/site-packages/prefect_gcp/models/cloud_run_v2.py", line 363, in get
    response = request.execute()
  File "/app/.venv/lib/python3.13/site-packages/googleapiclient/_helpers.py", line 130, in positional_wrapper
    return wrapped(*args, **kwargs)
  File "/app/.venv/lib/python3.13/site-packages/googleapiclient/http.py", line 923, in execute
    resp, content = _retry_request(
                    ~~~~~~~~~~~~~~^
        http,
        ^^^^^
    ...<7 lines>...
        headers=self.headers,
        ^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "/app/.venv/lib/python3.13/site-packages/googleapiclient/http.py", line 222, in _retry_request
    raise exception
  File "/app/.venv/lib/python3.13/site-packages/googleapiclient/http.py", line 191, in _retry_request
    resp, content = http.request(uri, method, *args, **kwargs)
                    ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/.venv/lib/python3.13/site-packages/google_auth_httplib2.py", line 209, in request
    self.credentials.before_request(self._request, method, uri, request_headers)
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/.venv/lib/python3.13/site-packages/google/auth/credentials.py", line 239, in before_request
    self._blocking_refresh(request)
    ~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^
  File "/app/.venv/lib/python3.13/site-packages/google/auth/credentials.py", line 202, in _blocking_refresh
    self.refresh(request)
    ~~~~~~~~~~~~^^^^^^^^^
  File "/app/.venv/lib/python3.13/site-packages/google/oauth2/service_account.py", line 448, in refresh
    access_token, expiry, _ = _client.jwt_grant(
                              ~~~~~~~~~~~~~~~~~^
        request, self._token_uri, assertion
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "/app/.venv/lib/python3.13/site-packages/google/oauth2/_client.py", line 299, in jwt_grant
    response_data = _token_endpoint_request(
        request,
    ...<5 lines>...
        },
    )
  File "/app/.venv/lib/python3.13/site-packages/google/oauth2/_client.py", line 259, in _token_endpoint_request
    response_status_ok, response_data, retryable_error = _token_endpoint_request_no_throw(
                                                         ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^
        request,
        ^^^^^^^^
    ...<6 lines>...
        **kwargs
        ^^^^^^^^
    )
    ^
  File "/app/.venv/lib/python3.13/site-packages/google/oauth2/_client.py", line 192, in _token_endpoint_request_no_throw
    response = request(
        method="POST", url=token_uri, headers=headers_to_use, body=body, **kwargs
    )
  File "/app/.venv/lib/python3.13/site-packages/google_auth_httplib2.py", line 119, in __call__
    response, data = self.http.request(
                     ~~~~~~~~~~~~~~~~~^
        url, method=method, body=body, headers=headers, **kwargs
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "/app/.venv/lib/python3.13/site-packages/httplib2/__init__.py", line 1724, in request
    (response, content) = self._request(
                          ~~~~~~~~~~~~~^
        conn, authority, uri, request_uri, method, body, headers, redirections, cachekey,
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "/app/.venv/lib/python3.13/site-packages/httplib2/__init__.py", line 1444, in _request
    (response, content) = self._conn_request(conn, request_uri, method, body, headers)
                          ~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/.venv/lib/python3.13/site-packages/httplib2/__init__.py", line 1367, in _conn_request
    conn.request(method, request_uri, body, headers)
    ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.13/http/client.py", line 1338, in request
    self._send_request(method, url, body, headers, encode_chunked)
    ~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.13/http/client.py", line 1384, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
    ~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.13/http/client.py", line 1333, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
    ~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.13/http/client.py", line 1093, in _send_output
    self.send(msg)
    ~~~~~~~~~^^^^^
  File "/usr/local/lib/python3.13/http/client.py", line 1057, in send
    self.sock.sendall(data)
    ~~~~~~~~~~~~~~~~~^^^^^^
  File "/usr/local/lib/python3.13/ssl.py", line 1263, in sendall
    v = self.send(byte_view[count:])
  File "/usr/local/lib/python3.13/ssl.py", line 1232, in send
    return self._sslobj.write(data)
           ~~~~~~~~~~~~~~~~~~^^^^^^
BrokenPipeError: [Errno 32] Broken pipe
02:13:23 PM
Error
Encountered an exception while waiting for job run completion - [Errno 32] Broken pipe