I am trying to run a deployment via Docker. Howeve...
# data-tricks-and-tips
s
I am trying to run a deployment via Docker. However, I receive the following error:
Copy code
FileNotFoundError: [Errno 2] No such file or directory: '/opt/prefect/flows'
Which is surprising because I don't use the above location to reference the flow. This is what my Docker file look like
Copy code
FROM python:3.8-slim-buster

ARG PREFECT_API_KEY
ENV PREFECT_API_KEY=$PREFECT_API_KEY

ARG PREFECT_API_URL
ENV PREFECT_API_URL=$PREFECT_API_URL

ARG GCP_DATASET_NAME
ENV GCP_DATASET_NAME=$GCP_DATASET_NAME

ARG GCP_DATASET_TABLE_NAME
ENV GCP_DATASET_TABLE_NAME=$GCP_DATASET_TABLE_NAME

ARG GCP_PROJECT_ID
ENV GCP_PROJECT_ID=$GCP_PROJECT_ID

ARG GCP_REGION
ENV GCP_REGION=$GCP_REGION

COPY poetry.lock .
COPY pyproject.toml .

RUN pip install poetry --trusted-host <http://pypi.python.org|pypi.python.org> --no-cache-dir
RUN poetry config virtualenvs.create false
RUN poetry install --no-root --without dev


RUN mkdir scripts
copy scripts/ scripts

RUN mkdir config
COPY config/ config

RUN mkdir -p dbt/xetra
COPY dbt/xetra dbt/xetra
Any idea why Prefect is looking for the flow in the /opt/prefect/flows directory? I am running this via the Cloud
a
Hey @sundeep! Are you using an infrastructure block or a work pool for your deployment?
r
@sundeep I’ve had similar issues setting up my stack (using flows over AWS with a Github infra block). Not sure if it’s relevant to a Docker deployment, but… As far as I can tell, prefect checks my block into
/opt/prefect/
and then auto-looks for the
/opt/prefect/flows
. Specifying
path=<flow_dir>
on my deployments was enough for prefect to find them. Prefect folks reading, please note that I didn’t find the documentation at all on how and where Prefect manages directories inside containers
s
@alex I am using an infrastructure block
@Robert Banick Let me try that. Thank you!
@Robert Banick that was indeed the issue. Thanks However, I have run into another error. Since my flow references the GCP Credentials block inside the container, this piece of code throws the following error:
Copy code
@task(log_prints=True, name="list files from gcs")
def list_gcs(credentials, bucket_name: str, prefix: str, delimiter: str) -> Iterator[str]:
    """Loop through files in GCS directory"""
    storage_client = storage.Client(credentials=credentials, project=credentials.project_id)
    # Note: Client.list_blobs requires at least package version 1.17.0.
    blobs = storage_client.list_blobs(bucket_name, prefix=prefix, delimiter=delimiter)
    for blob in blobs:
        yield blob.name
Error
Copy code
09:34:43.973 | INFO    | Flow run 'burrowing-parakeet' - gcsfs.retry.HttpError: Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)., 401
09:34:43.910 | ERROR   | Flow run 'attentive-basilisk' - Encountered exception during execution:
09:34:44.418 | ERROR   | Flow run 'attentive-basilisk' - Finished in state Failed("Flow run encountered an exception. gcsfs.retry.HttpError: Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)., 401\n")
I don't see this issue when I run the flow locally. Do I need to copy the credentials file inside the Docker? I am just wary of this as the image is published to the hub. The example provided here, doesn't copy any credentials to the Docker image https://github.com/discdiver/prefect-zoomcamp Any idea how to resolve this? Appreciate your help
r
Looks like a GCFS issue, sorry I don’t have any experience with this
s
No worries. Thanks!
@alex, do you have idea how to resolve this?
a
You can use a
GcpCredentials
block to retrieve credentials in your flow. Here’s an example of how to create those blocks: https://github.com/discdiver/prefect-zoomcamp/blob/main/blocks/make_gcp_blocks.py
s
@alex i have a gcp credentials block and also a json block to retrieve all the relevant credentials and env variables, but it doesn't help
It fails while reading the parquet df
Copy code
@task
def gcs_to_bq(file: str, 
              credentials,
              bucket_name: str, 
              gcp_dataset_name: str, 
              gcp_dataset_table: str) -> None:
    """Loading data to BigQuery"""
    # script runs but no data in BigQuery

    # setting google client
    client = bigquery.Client(credentials=credentials, project=credentials.project_id)
    try:
        df = pd.read_parquet(f"gs://{bucket_name}/{file}")
        df["key"] = df["ISIN"].fillna("") + "_" + df["Date"].astype(str).fillna("") + "_" + df["Time"].astype(str).fillna("")
        df["key"] = df["key"].str.lower().str.encode('utf-8').apply(sha)
        print(f"Number of rows {df.shape}")
        table_schema=[
            {
                "name": "key",
                "type": "STRING",
            },
            {
                "name": "ISIN",
                "type": "STRING",
            },
            {    
                "name": "Mnemonic",
                "type": "STRING",
            },
            {
                "name": "SecurityDesc",
                "type": "STRING",
            },
            {
                "name": "SecurityType",
                "type": "STRING",
            },
            {
                "name": "Currency",
                "type": "STRING",
            },
            {
                "name": "SecurityID",
                "type": "INT64",
            },
            {
                "name": "Date",
                "type": "TIMESTAMP",
            },
            {
                "name": "Time",
                "type": "STRING",
            },
            {
                "name": "StartPrice",
                "type": "FLOAT64",
            },
            {
                "name": "MaxPrice",
                "type": "FLOAT64",
            },
            {
                "name": "MinPrice",
                "type": "FLOAT64",
            },
            {
                "name": "EndPrice",
                "type": "FLOAT64",
            },
            {
                "name": "TradedVolume",
                "type": "INT64",
            },
            {
                "name": "NumberOfTrades",
                "type": "INT64",
            }
        ]
        job_config = bigquery.LoadJobConfig()
        job = client.load_table_from_dataframe(
                    df, 
                    f"{gcp_dataset_name}.{gcp_dataset_table}", 
                    job_config=job_config
        )
        job.result()
    except FileNotFoundError:
        pass
    except pyarrow.lib.ArrowTypeError:
        pass
Here is the stack trace
Copy code
00:21:16.666 | INFO    | prefect.agent - Submitting flow run 'c267fe95-6940-4346-9e2d-f1791df6a3bd'
00:21:18.360 | INFO    | prefect.infrastructure.docker-container - Pulling image 'sl02/xetra:latest'...
00:21:21.409 | INFO    | prefect.infrastructure.docker-container - Creating Docker container 'nickel-crow'...
00:21:21.454 | INFO    | prefect.infrastructure.docker-container - Docker container 'nickel-crow' has status 'created'
00:21:21.746 | INFO    | prefect.infrastructure.docker-container - Docker container 'nickel-crow' has status 'running'
00:21:22.039 | INFO    | prefect.agent - Completed submission of flow run 'c267fe95-6940-4346-9e2d-f1791df6a3bd'
/usr/local/lib/python3.8/runpy.py:127: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
  warn(RuntimeWarning(msg))
18:51:28.193 | INFO    | Flow run 'nickel-crow' - Downloading flow code from storage at 'scripts/'
18:51:31.673 | INFO    | Flow run 'nickel-crow' - Created task run 'dataset_load_check-0' for task 'dataset_load_check'
18:51:31.675 | INFO    | Flow run 'nickel-crow' - Executing 'dataset_load_check-0' immediately...
18:51:33.414 | INFO    | Task run 'dataset_load_check-0' - Finished in state Completed()
18:51:34.990 | INFO    | Flow run 'nickel-crow' - Created subflow run 'cordial-elephant' for flow 'Pipeline to read files from GCS and load to BigQuery'
18:51:36.587 | INFO    | Flow run 'cordial-elephant' - Created task run 'list files from gcs-0' for task 'list files from gcs'
18:51:36.588 | INFO    | Flow run 'cordial-elephant' - Executing 'list files from gcs-0' immediately...
18:51:37.444 | INFO    | Task run 'list files from gcs-0' - list_gcs <google.oauth2.service_account.Credentials object at 0x7fb9794def70>
18:51:38.392 | INFO    | Task run 'list files from gcs-0' - Finished in state Completed()
18:51:38.394 | INFO    | Flow run 'cordial-elephant' - file: data/xetra/2022-04-05/2022-04-05_BINS_XETR07.parquet
18:51:38.742 | INFO    | Flow run 'cordial-elephant' - Created task run 'gcs_to_bq-0' for task 'gcs_to_bq'
18:51:38.743 | INFO    | Flow run 'cordial-elephant' - Executing 'gcs_to_bq-0' immediately...
18:51:39.464 | INFO    | Task run 'gcs_to_bq-0' - Inside gcs_to_bq xetra-ds data/xetra/2022-04-05/2022-04-05_BINS_XETR07.parquet xetra_ds xetra_stocks
18:51:42.668 | WARNING | google.auth.compute_engine._metadata - Compute Engine Metadata server unavailable on attempt 1 of 3. Reason: timed out
18:51:42.670 | WARNING | google.auth.compute_engine._metadata - Compute Engine Metadata server unavailable on attempt 2 of 3. Reason: [Errno 111] Connection refused
18:51:42.672 | WARNING | google.auth.compute_engine._metadata - Compute Engine Metadata server unavailable on attempt 3 of 3. Reason: [Errno 111] Connection refused
18:51:42.673 | WARNING | google.auth._default - Authentication failed using Compute Engine authentication due to unavailable metadata server.
18:51:42.680 | WARNING | google.auth.compute_engine._metadata - Compute Engine Metadata server unavailable on attempt 1 of 5. Reason: HTTPConnectionPool(host='metadata.google.internal', port=80): Max retries exceeded with url: /computeMetadata/v1/instance/service-accounts/default/?recursive=true (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7fb9787e3640>: Failed to establish a new connection: [Errno -2] Name or service not known'))
18:51:42.686 | WARNING | google.auth.compute_engine._metadata - Compute Engine Metadata server unavailable on attempt 2 of 5. Reason: HTTPConnectionPool(host='metadata.google.internal', port=80): Max retries exceeded with url: /computeMetadata/v1/instance/service-accounts/default/?recursive=true (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7fb9787e3ac0>: Failed to establish a new connection: [Errno -2] Name or service not known'))
18:51:42.691 | WARNING | google.auth.compute_engine._metadata - Compute Engine Metadata server unavailable on attempt 3 of 5. Reason: HTTPConnectionPool(host='metadata.google.internal', port=80): Max retries exceeded with url: /computeMetadata/v1/instance/service-accounts/default/?recursive=true (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7fb9787e3d90>: Failed to establish a new connection: [Errno -2] Name or service not known'))
18:51:42.696 | WARNING | google.auth.compute_engine._metadata - Compute Engine Metadata server unavailable on attempt 4 of 5. Reason: HTTPConnectionPool(host='metadata.google.internal', port=80): Max retries exceeded with url: /computeMetadata/v1/instance/service-accounts/default/?recursive=true (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7fb9787e5790>: Failed to establish a new connection: [Errno -2] Name or service not known'))
18:51:42.701 | WARNING | google.auth.compute_engine._metadata - Compute Engine Metadata server unavailable on attempt 5 of 5. Reason: HTTPConnectionPool(host='metadata.google.internal', port=80): Max retries exceeded with url: /computeMetadata/v1/instance/service-accounts/default/?recursive=true (Caused by
Copy code
NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7fb9787e5670>: Failed to establish a new connection: [Errno -2] Name or service not known'))
18:51:43.083 | INFO    | Task run 'gcs_to_bq-0' - Traceback (most recent call last):
18:51:43.084 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 114, in retry_request
    return await func(*args, **kwargs)
18:51:43.086 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 411, in _request
    validate_response(status, contents, path, args)
18:51:43.088 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 101, in validate_response
    raise HttpError(error)
18:51:43.089 | INFO    | Task run 'gcs_to_bq-0' - gcsfs.retry.HttpError: Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)., 401
18:51:43.082 | ERROR   | gcsfs - _request non-retriable exception: Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)., 401
18:51:43.094 | INFO    | Task run 'gcs_to_bq-0' - Traceback (most recent call last):
18:51:43.095 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1551, in orchestrate_task_run
    result = await call.aresult()
18:51:43.096 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
18:51:43.097 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
18:51:43.098 | INFO    | Task run 'gcs_to_bq-0' -   File "scripts/gcs_to_bq.py", line 84, in gcs_to_bq
    df = pd.read_parquet(f"gs://{bucket_name}/{file}")
18:51:43.099 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/pandas/io/parquet.py", line 509, in read_parquet
    return impl.read(
18:51:43.100 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/pandas/io/parquet.py", line 227, in read
    pa_table = self.api.parquet.read_table(
18:51:43.101 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/pyarrow/parquet/core.py", line 2926, in read_table
    dataset = _ParquetDatasetV2(
18:51:43.102 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/pyarrow/parquet/core.py", line 2452, in __init__
    finfo = filesystem.get_file_info(path_or_paths)
18:51:43.103 | INFO    | Task run 'gcs_to_bq-0' -   File "pyarrow/_fs.pyx", line 571, in pyarrow._fs.FileSystem.get_file_info
18:51:43.104 | INFO    | Task run 'gcs_to_bq-0' -   File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
18:51:43.104 | INFO    | Task run 'gcs_to_bq-0' -   File "pyarrow/_fs.pyx", line 1490, in pyarrow._fs._cb_get_file_info
18:51:43.105 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/pyarrow/fs.py", line 330, in get_file_info
    info = <http://self.fs.info|self.fs.info>(path)
18:51:43.106 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 114, in wrapper
    return sync(self.loop, func, *args, **kwargs)
18:51:43.107 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 99, in sync
    raise return_result
18:51:43.108 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 54, in _runner
    result[0] = await coro
18:51:43.109 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 791, in _info
    exact = await self._get_object(path)
18:51:43.110 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 491, in _get_object
    res = await self._call(
18:51:43.111 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 418, in _call
    status, headers, info, contents = await self._request(
18:51:43.111 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/decorator.py", line 221, in fun
    return await caller(func, *(extras + args), **kw)
Copy code
18:51:43.112 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 149, in retry_request
    raise e
18:51:43.113 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 114, in retry_request
    return await func(*args, **kwargs)
18:51:43.114 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 411, in _request
    validate_response(status, contents, path, args)
18:51:43.115 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 101, in validate_response
    raise HttpError(error)
18:51:43.116 | INFO    | Task run 'gcs_to_bq-0' - gcsfs.retry.HttpError: Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)., 401
18:51:43.117 | INFO    | Task run 'gcs_to_bq-0' - Traceback (most recent call last):
18:51:43.118 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1551, in orchestrate_task_run
    result = await call.aresult()
18:51:43.119 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
18:51:43.120 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
18:51:43.121 | INFO    | Task run 'gcs_to_bq-0' -   File "scripts/gcs_to_bq.py", line 84, in gcs_to_bq
    df = pd.read_parquet(f"gs://{bucket_name}/{file}")
18:51:43.122 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/pandas/io/parquet.py", line 509, in read_parquet
    return impl.read(
18:51:43.123 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/pandas/io/parquet.py", line 227, in read
    pa_table = self.api.parquet.read_table(
18:51:43.124 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/pyarrow/parquet/core.py", line 2926, in read_table
    dataset = _ParquetDatasetV2(
18:51:43.125 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/pyarrow/parquet/core.py", line 2452, in __init__
    finfo = filesystem.get_file_info(path_or_paths)
18:51:43.126 | INFO    | Task run 'gcs_to_bq-0' -   File "pyarrow/_fs.pyx", line 571, in pyarrow._fs.FileSystem.get_file_info
18:51:43.127 | INFO    | Task run 'gcs_to_bq-0' -   File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
18:51:43.127 | INFO    | Task run 'gcs_to_bq-0' -   File "pyarrow/_fs.pyx", line 1490, in pyarrow._fs._cb_get_file_info
18:51:43.128 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/pyarrow/fs.py", line 330, in get_file_info
    info = <http://self.fs.info|self.fs.info>(path)
18:51:43.129 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 114, in wrapper
    return sync(self.loop, func, *args, **kwargs)
18:51:43.130 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 99, in sync
    raise return_result
18:51:43.130 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 54, in _runner
    result[0] = await coro
18:51:43.131 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 791, in _info
    exact = await self._get_object(path)
18:51:43.132 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 491, in _get_object
    res = await self._call(
18:51:43.133 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 418, in _call
    status, headers, info, contents = await self._request(
18:51:43.133 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/decorator.py", line 221, in fun
    return await caller(func, *(extras + args), **kw)
18:51:43.134 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 149, in retry_request
    raise e
18:51:43.135 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 114, in retry_request
    return await func(*args, **kwargs)
18:51:43.136 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 411, in _request
    validate_response(status, contents, path, args)
18:51:43.136 | INFO    | Task run 'gcs_to_bq-0' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 101, in validate_response
    raise HttpError(error)
18:51:43.137 | INFO    | Task run 'gcs_to_bq-0' - gcsfs.retry.HttpError: Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)., 401
18:51:43.092 | ERROR   | Task run 'gcs_to_bq-0' - Encountered exception during execution:
18:51:43.545 | ERROR   | Task run 'gcs_to_bq-0' - Finished in state Failed("Task run encountered an exception: gcsfs.retry.HttpError: Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)., 401\n")
18:51:43.549 | INFO    | Flow run 'nickel-crow' - Traceback (most recent call last):
18:51:43.550 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 669, in orchestrate_flow_run
    result = await flow_call.aresult()
18:51:43.551 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
18:51:43.552 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
18:51:43.553 | INFO    | Flow run 'nickel-crow' -   File "scripts/gcs_to_bq.py", line 265, in pipeline
    gcs_to_bq(file, credentials, bucket_name, gcp_dataset_name, gcp_dataset_table)
18:51:43.554 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/prefect/tasks.py", line 485, in __call__
    return enter_task_run_engine(
18:51:43.555 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 972, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
18:51:43.556 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/api.py", line 137, in wait_for_call_in_loop_thread
    return call.result()
18:51:43.556 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
18:51:43.557 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 437, in result
    return self.__get_result()
18:51:43.558 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
18:51:43.559 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
18:51:43.560 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1137, in get_task_call_return_value
    return await future._result()
18:51:43.561 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/prefect/futures.py", line 241, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
18:51:43.562 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
18:51:43.563 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1551, in orchestrate_task_run
    result = await call.aresult()
18:51:43.564 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
18:51:43.565 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
18:51:43.566 | INFO    | Flow run 'nickel-crow' -   File "scripts/gcs_to_bq.py", line 84, in gcs_to_bq
    df = pd.read_parquet(f"gs://{bucket_name}/{file}")
18:51:43.566 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/pandas/io/parquet.py", line 509, in read_parquet
    return impl.read(
18:51:43.567 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/pandas/io/parquet.py", line 227, in read
    pa_table = self.api.parquet.read_table(
18:51:43.568 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/pyarrow/parquet/core.py", line 2926, in read_table
    dataset = _ParquetDatasetV2(
18:51:43.569 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/pyarrow/parquet/core.py", line 2452, in __init__
    finfo = filesystem.get_file_info(path_or_paths)
18:51:43.570 | INFO    | Flow run 'nickel-crow' -   File "pyarrow/_fs.pyx", line 571, in pyarrow._fs.FileSystem.get_file_info
18:51:43.571 | INFO    | Flow run 'nickel-crow' -   File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
18:51:43.572 | INFO    | Flow run 'nickel-crow' -   File "pyarrow/_fs.pyx", line 1490, in pyarrow._fs._cb_get_file_info
Copy code
18:51:43.572 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/pyarrow/fs.py", line 330, in get_file_info
    info = <http://self.fs.info|self.fs.info>(path)
18:51:43.573 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 114, in wrapper
    return sync(self.loop, func, *args, **kwargs)
18:51:43.574 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 99, in sync
    raise return_result
18:51:43.575 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 54, in _runner
    result[0] = await coro
18:51:43.576 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 791, in _info
    exact = await self._get_object(path)
18:51:43.577 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 491, in _get_object
    res = await self._call(
18:51:43.578 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 418, in _call
    status, headers, info, contents = await self._request(
18:51:43.578 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/decorator.py", line 221, in fun
    return await caller(func, *(extras + args), **kw)
18:51:43.579 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 149, in retry_request
    raise e
18:51:43.580 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 114, in retry_request
    return await func(*args, **kwargs)
18:51:43.581 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 411, in _request
    validate_response(status, contents, path, args)
18:51:43.582 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 101, in validate_response
    raise HttpError(error)
18:51:43.583 | INFO    | Flow run 'nickel-crow' - gcsfs.retry.HttpError: Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)., 401
18:51:43.585 | INFO    | Flow run 'nickel-crow' - Traceback (most recent call last):
18:51:43.586 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 669, in orchestrate_flow_run
    result = await flow_call.aresult()
18:51:43.587 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
18:51:43.588 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
18:51:43.589 | INFO    | Flow run 'nickel-crow' -   File "scripts/gcs_to_bq.py", line 265, in pipeline
    gcs_to_bq(file, credentials, bucket_name, gcp_dataset_name, gcp_dataset_table)
18:51:43.590 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/prefect/tasks.py", line 485, in __call__
    return enter_task_run_engine(
18:51:43.590 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 972, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
Copy code
18:51:43.591 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/api.py", line 137, in wait_for_call_in_loop_thread
    return call.result()
18:51:43.592 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
18:51:43.593 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 437, in result
    return self.__get_result()
18:51:43.593 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
18:51:43.594 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
18:51:43.595 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1137, in get_task_call_return_value
    return await future._result()
18:51:43.596 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/prefect/futures.py", line 241, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
18:51:43.597 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
18:51:43.597 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1551, in orchestrate_task_run
    result = await call.aresult()
18:51:43.598 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
18:51:43.599 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
18:51:43.600 | INFO    | Flow run 'nickel-crow' -   File "scripts/gcs_to_bq.py", line 84, in gcs_to_bq
    df = pd.read_parquet(f"gs://{bucket_name}/{file}")
18:51:43.600 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/pandas/io/parquet.py", line 509, in read_parquet
    return impl.read(
18:51:43.601 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/pandas/io/parquet.py", line 227, in read
    pa_table = self.api.parquet.read_table(
18:51:43.602 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/pyarrow/parquet/core.py", line 2926, in read_table
    dataset = _ParquetDatasetV2(
18:51:43.603 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/pyarrow/parquet/core.py", line 2452, in __init__
    finfo = filesystem.get_file_info(path_or_paths)
18:51:43.604 | INFO    | Flow run 'nickel-crow' -   File "pyarrow/_fs.pyx", line 571, in pyarrow._fs.FileSystem.get_file_info
18:51:43.604 | INFO    | Flow run 'nickel-crow' -   File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
18:51:43.605 | INFO    | Flow run 'nickel-crow' -   File "pyarrow/_fs.pyx", line 1490, in pyarrow._fs._cb_get_file_info
18:51:43.606 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/pyarrow/fs.py", line 330, in get_file_info
    info = <http://self.fs.info|self.fs.info>(path)
18:51:43.606 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 114, in wrapper
    return sync(self.loop, func, *args, **kwargs)
18:51:43.607 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 99, in sync
    raise return_result
18:51:43.608 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 54, in _runner
    result[0] = await coro
18:51:43.609 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 791, in _info
    exact = await self._get_object(path)
18:51:43.609 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 491, in _get_object
    res = await self._call(
18:51:43.610 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 418, in _call
    status, headers, info, contents = await self._request(
18:51:43.611 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/decorator.py", line 221, in fun
    return await caller(func, *(extras + args), **kw)
18:51:43.612 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 149, in retry_request
    raise e
18:51:43.612 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 114, in retry_request
    return await func(*args, **kwargs)
18:51:43.613 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 411, in _request
    validate_response(status, contents, path, args)
18:51:43.614 | INFO    | Flow run 'nickel-crow' -   File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 101, in validate_response
    raise HttpError(error)
Copy code
18:51:43.615 | INFO    | Flow run 'nickel-crow' - gcsfs.retry.HttpError: Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)., 401
18:51:43.548 | ERROR   | Flow run 'cordial-elephant' - Encountered exception during execution:
18:51:44.073 | ERROR   | Flow run 'cordial-elephant' - Finished in state Failed("Flow run encountered an exception. gcsfs.retry.HttpError: Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)., 401\n")
18:51:44.075 | ERROR   | Flow run 'nickel-crow' - Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 669, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
  File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "scripts/gcs_to_bq.py", line 305, in main
    num = pipeline(bucket_name, prefix, gcp_dataset_name, gcp_dataset_table)
  File "/usr/local/lib/python3.8/site-packages/prefect/flows.py", line 468, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 184, in enter_flow_run_engine_from_flow_call
    retval = from_sync.wait_for_call_in_loop_thread(
  File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/api.py", line 137, in wait_for_call_in_loop_thread
    return call.result()
  File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 437, in result
    return self.__get_result()
  File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
  File "/usr/local/lib/python3.8/site-packages/prefect/client/utilities.py", line 40, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 566, in create_and_begin_subflow_run
    return await terminal_state.result(fetch=True)
  File "/usr/local/lib/python3.8/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 669, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
  File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "scripts/gcs_to_bq.py", line 265, in pipeline
    gcs_to_bq(file, credentials, bucket_name, gcp_dataset_name, gcp_dataset_table)
  File "/usr/local/lib/python3.8/site-packages/prefect/tasks.py", line 485, in __call__
    return enter_task_run_engine(
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 972, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
  File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/api.py", line 137, in wait_for_call_in_loop_thread
    return call.result()
  File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 437, in result
    return self.__get_result()
  File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1137, in get_task_call_return_value
    return await future._result()
  File "/usr/local/lib/python3.8/site-packages/prefect/futures.py", line 241, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/usr/local/lib/python3.8/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1551, in orchestrate_task_run
    result = await call.aresult()
  File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
  File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "scripts/gcs_to_bq.py", line 84, in gcs_to_bq
    df = pd.read_parquet(f"gs://{bucket_name}/{file}")
  File "/usr/local/lib/python3.8/site-packages/pandas/io/parquet.py", line 509, in read_parquet
    return impl.read(
  File "/usr/local/lib/python3.8/site-packages/pandas/io/parquet.py", line 227, in read
    pa_table = self.api.parquet.read_table(
  File "/usr/local/lib/python3.8/site-packages/pyarrow/parquet/core.py", line 2926, in read_table
    dataset = _ParquetDatasetV2(
  File "/usr/local/lib/python3.8/site-packages/pyarrow/parquet/core.py", line 2452, in __init__
    finfo = filesystem.get_file_info(path_or_paths)
  File "pyarrow/_fs.pyx", line 571, in pyarrow._fs.FileSystem.get_file_info
  File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/_fs.pyx", line 1490, in pyarrow._fs._cb_get_file_info
  File "/usr/local/lib/python3.8/site-packages/pyarrow/fs.py", line 330, in get_file_info
    info = <http://self.fs.info|self.fs.info>(path)
  File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 114, in wrapper
    return sync(self.loop, func, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 99, in sync
    raise return_result
  File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 54, in _runner
    result[0] = await coro
  File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 791, in _info
    exact = await self._get_object(path)
  File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 491, in _get_object
    res = await self._call(
  File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 418, in _call
    status, headers, info, contents = await self._request(
  File "/usr/local/lib/python3.8/site-packages/decorator.py", line 221, in fun
    return await caller(func, *(extras + args), **kw)
  File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 149, in retry_request
    raise e
  File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 114, in retry_request
    return await func(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 411, in _request
    validate_response(status, contents, path, args)
  File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 101, in validate_response
    raise HttpError(error)
gcsfs.retry.HttpError: Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)., 401
18:51:44.571 | ERROR   | Flow run 'nickel-crow' - Finished in state Failed("Flow run encountered an exception. gcsfs.retry.HttpError: Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)., 401\n")
k
@sundeep your service account or principal is missing the
storage.objects.get
permission for that bucket.