Hi everyone, All my prefect runs are failing with...
# ask-community
n
Hi everyone, All my prefect runs are failing with a same error when interacting with any Google Python APIs (ex:
google-cloud-secret-manager
,
google-cloud-logging
, etc.). All the Google python packages are running into a grpc transport error. It may be a package version issue or maybe some conflict.
I have the
grpcio
package in my Dockerfile and
build.py
and its being installed during build. But I still see this error at every run for the lines of code where interaction with any Google python packages happens.
Do you know what might be causing this?
k
Hi @Nivi Mukka, are you mapping with a Google client? Is this stuff from the Prefect task library or your own code? Does this code work if you don't use it inside a
@task
? You could do
task.run()
to test.
n
Hi @Kevin Kho, yes the code works when I run it in my AI Platform Notebook on GCP (all same package versions as Prefect’s Docker).
k
What executor are you using?
n
See code snippet in
build.py
where it fails:
Copy code
from google.cloud import secretmanager

secret_client = secretmanager.SecretManagerServiceClient.from_service_account_json('service_account_key.json').
response = secret_client.access_secret_version(name="projects/xxxxxxxxx/secrets/xxxxxxx/versions/latest")
I am using
DaskExecutor
k
Is this outside your Flow code?
Could you try using a LocalExecutor and seeing if there is an error? This would help confirm the issue.
Actually sorry what Storage are you using also?
n
This code snippet I shared is from my
build.py
script but parts of the Flow code where interaction with Google’s python APIs happens, it fails with same error. I am using
Docker
storage
I am actually not sure how to run this Prefect Flow locally. Could you share some inputs/docs?
k
So I think suspect this error is happens because the
secret_client
is being defined outside the Flow code. Could you try creating this inside a task that uses the client instead to defer the execution to Flow run time? And then also you can reregister with
flow.executor = LocalExecutor()
instead of
flow.executor = DaskExecutor()
first. Apply both these changes and re-register then re-run and we’ll see what happens
👍 1
n
I will try the
LocalExecutor
but I dont think defining it outside of the Flow code is the issue. Because I tried removing that whole code snippet from
build.py
and it fails with the same error for my Google logging API interaction which is all inside the Flow code.
Copy code
Unexpected error: TypeError("secure_channel() got an unexpected keyword argument 'default_scopes'")
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/opt/conda/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 860, in get_task_run_state
    logger=self.logger,
  File "/opt/conda/lib/python3.7/site-packages/prefect/utilities/executors.py", line 298, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/home/jupyter/SW_XRAY/flows/xray_flow.py", line 24, in bug_info_flow
  File "/home/dask/src/ds/grace/bug_info/bug_info.py", line 33, in main
    write_log_entry(logging_client=log_client, logger_name=logger, log_text='Job Started')
  File "/home/dask/utils/log_utils.py", line 20, in write_log_entry
    logger.log_text(log_text, severity=severity)
  File "/opt/conda/lib/python3.7/site-packages/google/cloud/logging_v2/logger.py", line 165, in log_text
    self._do_log(client, TextEntry, text, **kw)
  File "/opt/conda/lib/python3.7/site-packages/google/cloud/logging_v2/logger.py", line 134, in _do_log
    client.logging_api.write_entries([api_repr])
  File "/opt/conda/lib/python3.7/site-packages/google/cloud/logging_v2/client.py", line 154, in logging_api
    self._logging_api = _gapic.make_logging_api(self)
  File "/opt/conda/lib/python3.7/site-packages/google/cloud/logging_v2/_gapic.py", line 524, in make_logging_api
    client_options=client._client_options,
  File "/opt/conda/lib/python3.7/site-packages/google/cloud/logging_v2/services/logging_service_v2/client.py", line 356, in __init__
    or Transport == type(self).get_transport_class("grpc_asyncio")
  File "/opt/conda/lib/python3.7/site-packages/google/cloud/logging_v2/services/logging_service_v2/transports/grpc.py", line 169, in __init__
    ("grpc.max_receive_message_length", -1),
  File "/opt/conda/lib/python3.7/site-packages/google/cloud/logging_v2/services/logging_service_v2/transports/grpc.py", line 220, in create_channel
    **kwargs,
  File "/opt/conda/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 288, in create_channel
    return grpc.secure_channel(target, composite_credentials, **kwargs)
TypeError: secure_channel() got an unexpected keyword argument 'default_scopes'
This above error is from code that is inside the Flow in a task
k
I see what you mean. What is this grpc or client connection then if you delete the earlier code blocks?
n
The same grpc transport error comes at every line of code that is using any Google Python API packages. This latest error message I sent is from within a task inside the flow where it is trying to write logs to GCP Stackdriver.
k
I remember I saw an issue here where someone was trying to write logs also to GCP. It was a different error but the issue was that the logger gets serialized by
cloudpickle
and doesn’t keep the
Handler
information and instantiated in Dask. Would you be able to share a sample task where you see this?
n
These are my utility functions which are being added into Prefect’s Docker storage files:
Copy code
from google.cloud import logging


def create_logging_client():
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = \
        'secrets/xxxxxxxx.json'
    logging_client = logging.Client()
    return logging_client


def write_log_entry(logging_client, logger_name, log_text, severity='INFO'):
    logger = logging_client.logger(logger_name)
    logger.log_text(log_text, severity=severity)

    print('Wrote logs to {}: "{}"'.format(logger.name, log_text))
This code is in the task and where it fails:
Copy code
log_client = create_logging_client()
logger = 'xxxxxxx'
write_log_entry(logging_client=log_client, logger_name=logger, log_text='Job Started')
k
Ok everything looks good here. My bad for derailing us. You are absolutely right this seems like a package mismatch thing where the package with
grpc_gpc
might behind and that’s why the keyword argument is throwing an error.
n
No worries. Thanks for that validation. Do you know how I can go about fixing the issue? I have the same package versions that works in GCP’s AI Platform Notebook, in the Dockerfile as well.
k
Are you using DockerRun or KubernetesRun?
n
I think KubernetesRun? I’m not sure. Here are some details: Prefect and Dask are setup on Google Kubernetes Engine (GKE). The docker image, once built, gets stored on Google Container Registry (GCR), which is where the Prefect Flow pulls it from.
Also. It is a Kubernetes Agent.
k
So I would check if the image being pulled is upto date. The default
imagePullPolicy
for Kubernetes is
IfNotExist
so it doesn't pull the new versions of the containers if it already has one with the same name. If you had this error and updated the versions, there is a chance you are still using the old image. You can make sure to pull the latest image with.
Copy code
flow.run_config = KubernetesRun(image_pull_policy="Always")
inside the Flow code. Other than that, I guess you can log package versions in a task before the failing tasks so that you can compare what the flow actually has?
👍 1