sundeep
04/29/2023, 4:46 PMFileNotFoundError: [Errno 2] No such file or directory: '/opt/prefect/flows'
Which is surprising because I don't use the above location to reference the flow. This is what my Docker file look like
FROM python:3.8-slim-buster
ARG PREFECT_API_KEY
ENV PREFECT_API_KEY=$PREFECT_API_KEY
ARG PREFECT_API_URL
ENV PREFECT_API_URL=$PREFECT_API_URL
ARG GCP_DATASET_NAME
ENV GCP_DATASET_NAME=$GCP_DATASET_NAME
ARG GCP_DATASET_TABLE_NAME
ENV GCP_DATASET_TABLE_NAME=$GCP_DATASET_TABLE_NAME
ARG GCP_PROJECT_ID
ENV GCP_PROJECT_ID=$GCP_PROJECT_ID
ARG GCP_REGION
ENV GCP_REGION=$GCP_REGION
COPY poetry.lock .
COPY pyproject.toml .
RUN pip install poetry --trusted-host <http://pypi.python.org|pypi.python.org> --no-cache-dir
RUN poetry config virtualenvs.create false
RUN poetry install --no-root --without dev
RUN mkdir scripts
copy scripts/ scripts
RUN mkdir config
COPY config/ config
RUN mkdir -p dbt/xetra
COPY dbt/xetra dbt/xetra
Any idea why Prefect is looking for the flow in the /opt/prefect/flows directory?
I am running this via the Cloudalex
05/01/2023, 1:15 PMRobert Banick
05/01/2023, 3:10 PM/opt/prefect/
and then auto-looks for the /opt/prefect/flows
. Specifying path=<flow_dir>
on my deployments was enough for prefect to find them.
Prefect folks reading, please note that I didn’t find the documentation at all on how and where Prefect manages directories inside containerssundeep
05/01/2023, 4:57 PMsundeep
05/01/2023, 4:57 PMsundeep
05/02/2023, 3:43 PM@task(log_prints=True, name="list files from gcs")
def list_gcs(credentials, bucket_name: str, prefix: str, delimiter: str) -> Iterator[str]:
"""Loop through files in GCS directory"""
storage_client = storage.Client(credentials=credentials, project=credentials.project_id)
# Note: Client.list_blobs requires at least package version 1.17.0.
blobs = storage_client.list_blobs(bucket_name, prefix=prefix, delimiter=delimiter)
for blob in blobs:
yield blob.name
sundeep
05/02/2023, 3:44 PM09:34:43.973 | INFO | Flow run 'burrowing-parakeet' - gcsfs.retry.HttpError: Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)., 401
09:34:43.910 | ERROR | Flow run 'attentive-basilisk' - Encountered exception during execution:
09:34:44.418 | ERROR | Flow run 'attentive-basilisk' - Finished in state Failed("Flow run encountered an exception. gcsfs.retry.HttpError: Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)., 401\n")
sundeep
05/02/2023, 3:46 PMRobert Banick
05/02/2023, 3:49 PMsundeep
05/02/2023, 3:50 PMsundeep
05/02/2023, 3:50 PMalex
05/02/2023, 5:53 PMGcpCredentials
block to retrieve credentials in your flow. Here’s an example of how to create those blocks: https://github.com/discdiver/prefect-zoomcamp/blob/main/blocks/make_gcp_blocks.pysundeep
05/02/2023, 6:14 PMsundeep
05/02/2023, 6:56 PM@task
def gcs_to_bq(file: str,
credentials,
bucket_name: str,
gcp_dataset_name: str,
gcp_dataset_table: str) -> None:
"""Loading data to BigQuery"""
# script runs but no data in BigQuery
# setting google client
client = bigquery.Client(credentials=credentials, project=credentials.project_id)
try:
df = pd.read_parquet(f"gs://{bucket_name}/{file}")
df["key"] = df["ISIN"].fillna("") + "_" + df["Date"].astype(str).fillna("") + "_" + df["Time"].astype(str).fillna("")
df["key"] = df["key"].str.lower().str.encode('utf-8').apply(sha)
print(f"Number of rows {df.shape}")
table_schema=[
{
"name": "key",
"type": "STRING",
},
{
"name": "ISIN",
"type": "STRING",
},
{
"name": "Mnemonic",
"type": "STRING",
},
{
"name": "SecurityDesc",
"type": "STRING",
},
{
"name": "SecurityType",
"type": "STRING",
},
{
"name": "Currency",
"type": "STRING",
},
{
"name": "SecurityID",
"type": "INT64",
},
{
"name": "Date",
"type": "TIMESTAMP",
},
{
"name": "Time",
"type": "STRING",
},
{
"name": "StartPrice",
"type": "FLOAT64",
},
{
"name": "MaxPrice",
"type": "FLOAT64",
},
{
"name": "MinPrice",
"type": "FLOAT64",
},
{
"name": "EndPrice",
"type": "FLOAT64",
},
{
"name": "TradedVolume",
"type": "INT64",
},
{
"name": "NumberOfTrades",
"type": "INT64",
}
]
job_config = bigquery.LoadJobConfig()
job = client.load_table_from_dataframe(
df,
f"{gcp_dataset_name}.{gcp_dataset_table}",
job_config=job_config
)
job.result()
except FileNotFoundError:
pass
except pyarrow.lib.ArrowTypeError:
pass
sundeep
05/02/2023, 6:58 PM00:21:16.666 | INFO | prefect.agent - Submitting flow run 'c267fe95-6940-4346-9e2d-f1791df6a3bd'
00:21:18.360 | INFO | prefect.infrastructure.docker-container - Pulling image 'sl02/xetra:latest'...
00:21:21.409 | INFO | prefect.infrastructure.docker-container - Creating Docker container 'nickel-crow'...
00:21:21.454 | INFO | prefect.infrastructure.docker-container - Docker container 'nickel-crow' has status 'created'
00:21:21.746 | INFO | prefect.infrastructure.docker-container - Docker container 'nickel-crow' has status 'running'
00:21:22.039 | INFO | prefect.agent - Completed submission of flow run 'c267fe95-6940-4346-9e2d-f1791df6a3bd'
/usr/local/lib/python3.8/runpy.py:127: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
warn(RuntimeWarning(msg))
18:51:28.193 | INFO | Flow run 'nickel-crow' - Downloading flow code from storage at 'scripts/'
18:51:31.673 | INFO | Flow run 'nickel-crow' - Created task run 'dataset_load_check-0' for task 'dataset_load_check'
18:51:31.675 | INFO | Flow run 'nickel-crow' - Executing 'dataset_load_check-0' immediately...
18:51:33.414 | INFO | Task run 'dataset_load_check-0' - Finished in state Completed()
18:51:34.990 | INFO | Flow run 'nickel-crow' - Created subflow run 'cordial-elephant' for flow 'Pipeline to read files from GCS and load to BigQuery'
18:51:36.587 | INFO | Flow run 'cordial-elephant' - Created task run 'list files from gcs-0' for task 'list files from gcs'
18:51:36.588 | INFO | Flow run 'cordial-elephant' - Executing 'list files from gcs-0' immediately...
18:51:37.444 | INFO | Task run 'list files from gcs-0' - list_gcs <google.oauth2.service_account.Credentials object at 0x7fb9794def70>
18:51:38.392 | INFO | Task run 'list files from gcs-0' - Finished in state Completed()
18:51:38.394 | INFO | Flow run 'cordial-elephant' - file: data/xetra/2022-04-05/2022-04-05_BINS_XETR07.parquet
18:51:38.742 | INFO | Flow run 'cordial-elephant' - Created task run 'gcs_to_bq-0' for task 'gcs_to_bq'
18:51:38.743 | INFO | Flow run 'cordial-elephant' - Executing 'gcs_to_bq-0' immediately...
18:51:39.464 | INFO | Task run 'gcs_to_bq-0' - Inside gcs_to_bq xetra-ds data/xetra/2022-04-05/2022-04-05_BINS_XETR07.parquet xetra_ds xetra_stocks
18:51:42.668 | WARNING | google.auth.compute_engine._metadata - Compute Engine Metadata server unavailable on attempt 1 of 3. Reason: timed out
18:51:42.670 | WARNING | google.auth.compute_engine._metadata - Compute Engine Metadata server unavailable on attempt 2 of 3. Reason: [Errno 111] Connection refused
18:51:42.672 | WARNING | google.auth.compute_engine._metadata - Compute Engine Metadata server unavailable on attempt 3 of 3. Reason: [Errno 111] Connection refused
18:51:42.673 | WARNING | google.auth._default - Authentication failed using Compute Engine authentication due to unavailable metadata server.
18:51:42.680 | WARNING | google.auth.compute_engine._metadata - Compute Engine Metadata server unavailable on attempt 1 of 5. Reason: HTTPConnectionPool(host='metadata.google.internal', port=80): Max retries exceeded with url: /computeMetadata/v1/instance/service-accounts/default/?recursive=true (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7fb9787e3640>: Failed to establish a new connection: [Errno -2] Name or service not known'))
18:51:42.686 | WARNING | google.auth.compute_engine._metadata - Compute Engine Metadata server unavailable on attempt 2 of 5. Reason: HTTPConnectionPool(host='metadata.google.internal', port=80): Max retries exceeded with url: /computeMetadata/v1/instance/service-accounts/default/?recursive=true (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7fb9787e3ac0>: Failed to establish a new connection: [Errno -2] Name or service not known'))
18:51:42.691 | WARNING | google.auth.compute_engine._metadata - Compute Engine Metadata server unavailable on attempt 3 of 5. Reason: HTTPConnectionPool(host='metadata.google.internal', port=80): Max retries exceeded with url: /computeMetadata/v1/instance/service-accounts/default/?recursive=true (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7fb9787e3d90>: Failed to establish a new connection: [Errno -2] Name or service not known'))
18:51:42.696 | WARNING | google.auth.compute_engine._metadata - Compute Engine Metadata server unavailable on attempt 4 of 5. Reason: HTTPConnectionPool(host='metadata.google.internal', port=80): Max retries exceeded with url: /computeMetadata/v1/instance/service-accounts/default/?recursive=true (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7fb9787e5790>: Failed to establish a new connection: [Errno -2] Name or service not known'))
18:51:42.701 | WARNING | google.auth.compute_engine._metadata - Compute Engine Metadata server unavailable on attempt 5 of 5. Reason: HTTPConnectionPool(host='metadata.google.internal', port=80): Max retries exceeded with url: /computeMetadata/v1/instance/service-accounts/default/?recursive=true (Caused by
sundeep
05/02/2023, 7:00 PMNewConnectionError('<urllib3.connection.HTTPConnection object at 0x7fb9787e5670>: Failed to establish a new connection: [Errno -2] Name or service not known'))
18:51:43.083 | INFO | Task run 'gcs_to_bq-0' - Traceback (most recent call last):
18:51:43.084 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 114, in retry_request
return await func(*args, **kwargs)
18:51:43.086 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 411, in _request
validate_response(status, contents, path, args)
18:51:43.088 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 101, in validate_response
raise HttpError(error)
18:51:43.089 | INFO | Task run 'gcs_to_bq-0' - gcsfs.retry.HttpError: Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)., 401
18:51:43.082 | ERROR | gcsfs - _request non-retriable exception: Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)., 401
18:51:43.094 | INFO | Task run 'gcs_to_bq-0' - Traceback (most recent call last):
18:51:43.095 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1551, in orchestrate_task_run
result = await call.aresult()
18:51:43.096 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
return await asyncio.wrap_future(self.future)
18:51:43.097 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
result = self.fn(*self.args, **self.kwargs)
18:51:43.098 | INFO | Task run 'gcs_to_bq-0' - File "scripts/gcs_to_bq.py", line 84, in gcs_to_bq
df = pd.read_parquet(f"gs://{bucket_name}/{file}")
18:51:43.099 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/pandas/io/parquet.py", line 509, in read_parquet
return impl.read(
18:51:43.100 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/pandas/io/parquet.py", line 227, in read
pa_table = self.api.parquet.read_table(
18:51:43.101 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/pyarrow/parquet/core.py", line 2926, in read_table
dataset = _ParquetDatasetV2(
18:51:43.102 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/pyarrow/parquet/core.py", line 2452, in __init__
finfo = filesystem.get_file_info(path_or_paths)
18:51:43.103 | INFO | Task run 'gcs_to_bq-0' - File "pyarrow/_fs.pyx", line 571, in pyarrow._fs.FileSystem.get_file_info
18:51:43.104 | INFO | Task run 'gcs_to_bq-0' - File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
18:51:43.104 | INFO | Task run 'gcs_to_bq-0' - File "pyarrow/_fs.pyx", line 1490, in pyarrow._fs._cb_get_file_info
18:51:43.105 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/pyarrow/fs.py", line 330, in get_file_info
info = <http://self.fs.info|self.fs.info>(path)
18:51:43.106 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 114, in wrapper
return sync(self.loop, func, *args, **kwargs)
18:51:43.107 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 99, in sync
raise return_result
18:51:43.108 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 54, in _runner
result[0] = await coro
18:51:43.109 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 791, in _info
exact = await self._get_object(path)
18:51:43.110 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 491, in _get_object
res = await self._call(
18:51:43.111 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 418, in _call
status, headers, info, contents = await self._request(
18:51:43.111 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/decorator.py", line 221, in fun
return await caller(func, *(extras + args), **kw)
sundeep
05/02/2023, 7:00 PM18:51:43.112 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 149, in retry_request
raise e
18:51:43.113 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 114, in retry_request
return await func(*args, **kwargs)
18:51:43.114 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 411, in _request
validate_response(status, contents, path, args)
18:51:43.115 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 101, in validate_response
raise HttpError(error)
18:51:43.116 | INFO | Task run 'gcs_to_bq-0' - gcsfs.retry.HttpError: Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)., 401
18:51:43.117 | INFO | Task run 'gcs_to_bq-0' - Traceback (most recent call last):
18:51:43.118 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1551, in orchestrate_task_run
result = await call.aresult()
18:51:43.119 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
return await asyncio.wrap_future(self.future)
18:51:43.120 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
result = self.fn(*self.args, **self.kwargs)
18:51:43.121 | INFO | Task run 'gcs_to_bq-0' - File "scripts/gcs_to_bq.py", line 84, in gcs_to_bq
df = pd.read_parquet(f"gs://{bucket_name}/{file}")
18:51:43.122 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/pandas/io/parquet.py", line 509, in read_parquet
return impl.read(
18:51:43.123 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/pandas/io/parquet.py", line 227, in read
pa_table = self.api.parquet.read_table(
18:51:43.124 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/pyarrow/parquet/core.py", line 2926, in read_table
dataset = _ParquetDatasetV2(
18:51:43.125 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/pyarrow/parquet/core.py", line 2452, in __init__
finfo = filesystem.get_file_info(path_or_paths)
18:51:43.126 | INFO | Task run 'gcs_to_bq-0' - File "pyarrow/_fs.pyx", line 571, in pyarrow._fs.FileSystem.get_file_info
18:51:43.127 | INFO | Task run 'gcs_to_bq-0' - File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
18:51:43.127 | INFO | Task run 'gcs_to_bq-0' - File "pyarrow/_fs.pyx", line 1490, in pyarrow._fs._cb_get_file_info
18:51:43.128 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/pyarrow/fs.py", line 330, in get_file_info
info = <http://self.fs.info|self.fs.info>(path)
18:51:43.129 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 114, in wrapper
return sync(self.loop, func, *args, **kwargs)
18:51:43.130 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 99, in sync
raise return_result
18:51:43.130 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 54, in _runner
result[0] = await coro
18:51:43.131 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 791, in _info
exact = await self._get_object(path)
18:51:43.132 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 491, in _get_object
res = await self._call(
18:51:43.133 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 418, in _call
status, headers, info, contents = await self._request(
18:51:43.133 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/decorator.py", line 221, in fun
return await caller(func, *(extras + args), **kw)
18:51:43.134 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 149, in retry_request
raise e
18:51:43.135 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 114, in retry_request
return await func(*args, **kwargs)
18:51:43.136 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 411, in _request
validate_response(status, contents, path, args)
18:51:43.136 | INFO | Task run 'gcs_to_bq-0' - File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 101, in validate_response
raise HttpError(error)
18:51:43.137 | INFO | Task run 'gcs_to_bq-0' - gcsfs.retry.HttpError: Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)., 401
18:51:43.092 | ERROR | Task run 'gcs_to_bq-0' - Encountered exception during execution:
18:51:43.545 | ERROR | Task run 'gcs_to_bq-0' - Finished in state Failed("Task run encountered an exception: gcsfs.retry.HttpError: Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)., 401\n")
18:51:43.549 | INFO | Flow run 'nickel-crow' - Traceback (most recent call last):
18:51:43.550 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 669, in orchestrate_flow_run
result = await flow_call.aresult()
18:51:43.551 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
return await asyncio.wrap_future(self.future)
18:51:43.552 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
result = self.fn(*self.args, **self.kwargs)
18:51:43.553 | INFO | Flow run 'nickel-crow' - File "scripts/gcs_to_bq.py", line 265, in pipeline
gcs_to_bq(file, credentials, bucket_name, gcp_dataset_name, gcp_dataset_table)
18:51:43.554 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/prefect/tasks.py", line 485, in __call__
return enter_task_run_engine(
18:51:43.555 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 972, in enter_task_run_engine
return from_sync.wait_for_call_in_loop_thread(begin_run)
18:51:43.556 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/api.py", line 137, in wait_for_call_in_loop_thread
return call.result()
18:51:43.556 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
return self.future.result(timeout=timeout)
18:51:43.557 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 437, in result
return self.__get_result()
18:51:43.558 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
18:51:43.559 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
18:51:43.560 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1137, in get_task_call_return_value
return await future._result()
18:51:43.561 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/prefect/futures.py", line 241, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
18:51:43.562 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
18:51:43.563 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1551, in orchestrate_task_run
result = await call.aresult()
18:51:43.564 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
return await asyncio.wrap_future(self.future)
18:51:43.565 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
result = self.fn(*self.args, **self.kwargs)
18:51:43.566 | INFO | Flow run 'nickel-crow' - File "scripts/gcs_to_bq.py", line 84, in gcs_to_bq
df = pd.read_parquet(f"gs://{bucket_name}/{file}")
18:51:43.566 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/pandas/io/parquet.py", line 509, in read_parquet
return impl.read(
18:51:43.567 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/pandas/io/parquet.py", line 227, in read
pa_table = self.api.parquet.read_table(
18:51:43.568 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/pyarrow/parquet/core.py", line 2926, in read_table
dataset = _ParquetDatasetV2(
18:51:43.569 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/pyarrow/parquet/core.py", line 2452, in __init__
finfo = filesystem.get_file_info(path_or_paths)
18:51:43.570 | INFO | Flow run 'nickel-crow' - File "pyarrow/_fs.pyx", line 571, in pyarrow._fs.FileSystem.get_file_info
18:51:43.571 | INFO | Flow run 'nickel-crow' - File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
18:51:43.572 | INFO | Flow run 'nickel-crow' - File "pyarrow/_fs.pyx", line 1490, in pyarrow._fs._cb_get_file_info
sundeep
05/02/2023, 7:00 PM18:51:43.572 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/pyarrow/fs.py", line 330, in get_file_info
info = <http://self.fs.info|self.fs.info>(path)
18:51:43.573 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 114, in wrapper
return sync(self.loop, func, *args, **kwargs)
18:51:43.574 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 99, in sync
raise return_result
18:51:43.575 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 54, in _runner
result[0] = await coro
18:51:43.576 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 791, in _info
exact = await self._get_object(path)
18:51:43.577 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 491, in _get_object
res = await self._call(
18:51:43.578 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 418, in _call
status, headers, info, contents = await self._request(
18:51:43.578 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/decorator.py", line 221, in fun
return await caller(func, *(extras + args), **kw)
18:51:43.579 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 149, in retry_request
raise e
18:51:43.580 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 114, in retry_request
return await func(*args, **kwargs)
18:51:43.581 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 411, in _request
validate_response(status, contents, path, args)
18:51:43.582 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 101, in validate_response
raise HttpError(error)
18:51:43.583 | INFO | Flow run 'nickel-crow' - gcsfs.retry.HttpError: Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)., 401
18:51:43.585 | INFO | Flow run 'nickel-crow' - Traceback (most recent call last):
18:51:43.586 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 669, in orchestrate_flow_run
result = await flow_call.aresult()
18:51:43.587 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
return await asyncio.wrap_future(self.future)
18:51:43.588 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
result = self.fn(*self.args, **self.kwargs)
18:51:43.589 | INFO | Flow run 'nickel-crow' - File "scripts/gcs_to_bq.py", line 265, in pipeline
gcs_to_bq(file, credentials, bucket_name, gcp_dataset_name, gcp_dataset_table)
18:51:43.590 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/prefect/tasks.py", line 485, in __call__
return enter_task_run_engine(
18:51:43.590 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 972, in enter_task_run_engine
return from_sync.wait_for_call_in_loop_thread(begin_run)
sundeep
05/02/2023, 7:01 PM18:51:43.591 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/api.py", line 137, in wait_for_call_in_loop_thread
return call.result()
18:51:43.592 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
return self.future.result(timeout=timeout)
18:51:43.593 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 437, in result
return self.__get_result()
18:51:43.593 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
18:51:43.594 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
18:51:43.595 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1137, in get_task_call_return_value
return await future._result()
18:51:43.596 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/prefect/futures.py", line 241, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
18:51:43.597 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
18:51:43.597 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1551, in orchestrate_task_run
result = await call.aresult()
18:51:43.598 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
return await asyncio.wrap_future(self.future)
18:51:43.599 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
result = self.fn(*self.args, **self.kwargs)
18:51:43.600 | INFO | Flow run 'nickel-crow' - File "scripts/gcs_to_bq.py", line 84, in gcs_to_bq
df = pd.read_parquet(f"gs://{bucket_name}/{file}")
18:51:43.600 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/pandas/io/parquet.py", line 509, in read_parquet
return impl.read(
18:51:43.601 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/pandas/io/parquet.py", line 227, in read
pa_table = self.api.parquet.read_table(
18:51:43.602 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/pyarrow/parquet/core.py", line 2926, in read_table
dataset = _ParquetDatasetV2(
18:51:43.603 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/pyarrow/parquet/core.py", line 2452, in __init__
finfo = filesystem.get_file_info(path_or_paths)
18:51:43.604 | INFO | Flow run 'nickel-crow' - File "pyarrow/_fs.pyx", line 571, in pyarrow._fs.FileSystem.get_file_info
18:51:43.604 | INFO | Flow run 'nickel-crow' - File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
18:51:43.605 | INFO | Flow run 'nickel-crow' - File "pyarrow/_fs.pyx", line 1490, in pyarrow._fs._cb_get_file_info
18:51:43.606 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/pyarrow/fs.py", line 330, in get_file_info
info = <http://self.fs.info|self.fs.info>(path)
18:51:43.606 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 114, in wrapper
return sync(self.loop, func, *args, **kwargs)
18:51:43.607 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 99, in sync
raise return_result
18:51:43.608 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 54, in _runner
result[0] = await coro
18:51:43.609 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 791, in _info
exact = await self._get_object(path)
18:51:43.609 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 491, in _get_object
res = await self._call(
18:51:43.610 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 418, in _call
status, headers, info, contents = await self._request(
18:51:43.611 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/decorator.py", line 221, in fun
return await caller(func, *(extras + args), **kw)
18:51:43.612 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 149, in retry_request
raise e
18:51:43.612 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 114, in retry_request
return await func(*args, **kwargs)
18:51:43.613 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 411, in _request
validate_response(status, contents, path, args)
18:51:43.614 | INFO | Flow run 'nickel-crow' - File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 101, in validate_response
raise HttpError(error)
sundeep
05/02/2023, 7:01 PM18:51:43.615 | INFO | Flow run 'nickel-crow' - gcsfs.retry.HttpError: Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)., 401
18:51:43.548 | ERROR | Flow run 'cordial-elephant' - Encountered exception during execution:
18:51:44.073 | ERROR | Flow run 'cordial-elephant' - Finished in state Failed("Flow run encountered an exception. gcsfs.retry.HttpError: Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)., 401\n")
18:51:44.075 | ERROR | Flow run 'nickel-crow' - Encountered exception during execution:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 669, in orchestrate_flow_run
result = await flow_call.aresult()
File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
return await asyncio.wrap_future(self.future)
File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
result = self.fn(*self.args, **self.kwargs)
File "scripts/gcs_to_bq.py", line 305, in main
num = pipeline(bucket_name, prefix, gcp_dataset_name, gcp_dataset_table)
File "/usr/local/lib/python3.8/site-packages/prefect/flows.py", line 468, in __call__
return enter_flow_run_engine_from_flow_call(
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 184, in enter_flow_run_engine_from_flow_call
retval = from_sync.wait_for_call_in_loop_thread(
File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/api.py", line 137, in wait_for_call_in_loop_thread
return call.result()
File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
return self.future.result(timeout=timeout)
File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 437, in result
return self.__get_result()
File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
File "/usr/local/lib/python3.8/site-packages/prefect/client/utilities.py", line 40, in with_injected_client
return await fn(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 566, in create_and_begin_subflow_run
return await terminal_state.result(fetch=True)
File "/usr/local/lib/python3.8/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 669, in orchestrate_flow_run
result = await flow_call.aresult()
File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
return await asyncio.wrap_future(self.future)
File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
result = self.fn(*self.args, **self.kwargs)
File "scripts/gcs_to_bq.py", line 265, in pipeline
gcs_to_bq(file, credentials, bucket_name, gcp_dataset_name, gcp_dataset_table)
File "/usr/local/lib/python3.8/site-packages/prefect/tasks.py", line 485, in __call__
return enter_task_run_engine(
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 972, in enter_task_run_engine
return from_sync.wait_for_call_in_loop_thread(begin_run)
File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/api.py", line 137, in wait_for_call_in_loop_thread
return call.result()
File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
return self.future.result(timeout=timeout)
File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 437, in result
return self.__get_result()
File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1137, in get_task_call_return_value
return await future._result()
File "/usr/local/lib/python3.8/site-packages/prefect/futures.py", line 241, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
File "/usr/local/lib/python3.8/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1551, in orchestrate_task_run
result = await call.aresult()
File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
return await asyncio.wrap_future(self.future)
File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
result = self.fn(*self.args, **self.kwargs)
File "scripts/gcs_to_bq.py", line 84, in gcs_to_bq
df = pd.read_parquet(f"gs://{bucket_name}/{file}")
File "/usr/local/lib/python3.8/site-packages/pandas/io/parquet.py", line 509, in read_parquet
return impl.read(
File "/usr/local/lib/python3.8/site-packages/pandas/io/parquet.py", line 227, in read
pa_table = self.api.parquet.read_table(
File "/usr/local/lib/python3.8/site-packages/pyarrow/parquet/core.py", line 2926, in read_table
dataset = _ParquetDatasetV2(
File "/usr/local/lib/python3.8/site-packages/pyarrow/parquet/core.py", line 2452, in __init__
finfo = filesystem.get_file_info(path_or_paths)
File "pyarrow/_fs.pyx", line 571, in pyarrow._fs.FileSystem.get_file_info
File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
File "pyarrow/_fs.pyx", line 1490, in pyarrow._fs._cb_get_file_info
File "/usr/local/lib/python3.8/site-packages/pyarrow/fs.py", line 330, in get_file_info
info = <http://self.fs.info|self.fs.info>(path)
File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 114, in wrapper
return sync(self.loop, func, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 99, in sync
raise return_result
File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 54, in _runner
result[0] = await coro
File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 791, in _info
exact = await self._get_object(path)
File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 491, in _get_object
res = await self._call(
File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 418, in _call
status, headers, info, contents = await self._request(
File "/usr/local/lib/python3.8/site-packages/decorator.py", line 221, in fun
return await caller(func, *(extras + args), **kw)
File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 149, in retry_request
raise e
File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 114, in retry_request
return await func(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 411, in _request
validate_response(status, contents, path, args)
File "/usr/local/lib/python3.8/site-packages/gcsfs/retry.py", line 101, in validate_response
raise HttpError(error)
gcsfs.retry.HttpError: Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)., 401
18:51:44.571 | ERROR | Flow run 'nickel-crow' - Finished in state Failed("Flow run encountered an exception. gcsfs.retry.HttpError: Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)., 401\n")
kasteph
05/09/2023, 10:28 PMstorage.objects.get
permission for that bucket.