Mike Cole
07/14/2023, 12:40 AMfrom prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.bigquery import bigquery_load_cloud_storage
@flow
def example_bigquery_load_cloud_storage_flow():
gcp_credentials = GcpCredentials(project="mpls-311")
result = bigquery_load_cloud_storage(
dataset="mpls_311_staging",
table="stg_mpls_311data",
uri="<gs://data_lake_mpls-311/data/pq/mpls_311data_2023.parquet>",
gcp_credentials=gcp_credentials
)
return result
example_bigquery_load_cloud_storage_flow()
The script below, for creating a table in my BQ dataset using bigquery_create_table successfully finds my dataset, and builds a table in it. As far as I can see, I have it set up the same, related to identifying my GCP Project and BQ Dataset.
from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.bigquery import bigquery_create_table
from google.cloud.bigquery import SchemaField
@flow
def example_bigquery_create_table_flow():
gcp_credentials = GcpCredentials(project="mpls-311")
schema = [
SchemaField("number", field_type="INTEGER", mode="REQUIRED"),
SchemaField("text", field_type="STRING", mode="REQUIRED"),
SchemaField("bool", field_type="BOOLEAN")
]
result = bigquery_create_table(
dataset="mpls_311_staging",
table="test_table",
schema=schema,
gcp_credentials=gcp_credentials
)
return result
example_bigquery_create_table_flow()
I'm stumped. Anyone have any ideas on what I might be doing wrong? I'll include the full traceback in the thread.13:40:30.518 | ERROR | Flow run 'foamy-dog' - Finished in state Failed('Flow run encountered an exception. google.api_core.exceptions.NotFound: 404 POST <https://bigquery.googleapis.com/bigquery/v2/projects/mpls-311/jobs?prettyPrint=false>: Not found: Dataset mpls-311:mpls_311_staging\n')
Traceback (most recent call last):
File "/home/mike/git/mpls-311-data/flows/bq_test2.py", line 16, in <module>
example_bigquery_load_cloud_storage_flow()
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect/flows.py", line 448, in __call__
return enter_flow_run_engine_from_flow_call(
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect/engine.py", line 164, in enter_flow_run_engine_from_flow_call
return anyio.run(begin_run)
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 68, in run
return asynclib.run(func, *args, **backend_options)
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 204, in run
return native_run(wrapper(), debug=debug)
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
return future.result()
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 199, in wrapper
return await func(*args)
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
return await fn(*args, **kwargs)
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect/engine.py", line 244, in create_then_begin_flow_run
return await state.result(fetch=True)
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect/states.py", line 89, in _get_state_result
raise await get_state_exception(state)
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect/engine.py", line 636, in orchestrate_flow_run
result = await run_sync(flow_call)
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 156, in run_sync_in_interruptible_worker_thread
tg.start_soon(
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 597, in __aexit__
raise exceptions[0]
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/anyio/to_thread.py", line 33, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 877, in run_sync_in_worker_thread
return await future
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 807, in run
result = context.run(func, *args)
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 135, in capture_worker_thread_and_result
result = __fn(*args, **kwargs)
File "/home/mike/git/mpls-311-data/flows/bq_test2.py", line 8, in example_bigquery_load_cloud_storage_flow
result = bigquery_load_cloud_storage(
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect/tasks.py", line 436, in __call__
return enter_task_run_engine(
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect/engine.py", line 926, in enter_task_run_engine
return run_async_from_worker_thread(begin_run)
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 177, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/anyio/from_thread.py", line 47, in run
return asynclib.run_async_from_thread(func, *args)
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 906, in run_async_from_thread
return f.result()
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/concurrent/futures/_base.py", line 446, in result
return self.__get_result()
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect/engine.py", line 1064, in get_task_call_return_value
return await future._result()
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect/futures.py", line 237, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect/states.py", line 89, in _get_state_result
raise await get_state_exception(state)
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect/engine.py", line 1438, in orchestrate_task_run
result = await task.fn(*args, **kwargs)
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect_gcp/bigquery.py", line 406, in bigquery_load_cloud_storage
result = await to_thread.run_sync(partial_load)
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/anyio/to_thread.py", line 33, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 877, in run_sync_in_worker_thread
return await future
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 807, in run
result = context.run(func, *args)
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect_gcp/bigquery.py", line 40, in _result_sync
result = func(*args, **kwargs).result()
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/google/cloud/bigquery/client.py", line 2375, in load_table_from_uri
load_job._begin(retry=retry, timeout=timeout)
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/google/cloud/bigquery/job/base.py", line 693, in _begin
api_response = client._call_api(
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/google/cloud/bigquery/client.py", line 813, in _call_api
return call()
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/google/api_core/retry.py", line 349, in retry_wrapped_func
return retry_target(
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/google/api_core/retry.py", line 191, in retry_target
return target()
File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/google/cloud/_http/__init__.py", line 494, in api_request
raise exceptions.from_http_response(response)
google.api_core.exceptions.NotFound: 404 POST <https://bigquery.googleapis.com/bigquery/v2/projects/mpls-311/jobs?prettyPrint=false>: Not found: Dataset mpls-311:mpls_311_staging
from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.bigquery import bigquery_load_cloud_storage
@flow
def example_bigquery_load_cloud_storage_flow():
gcp_credentials = GcpCredentials(project="mpls-311")
job_config = {'write_disposition': 'WRITE_APPEND',
'source_format': 'PARQUET'}
result = bigquery_load_cloud_storage(
dataset="mpls_311_staging",
table="stg_mpls_311data",
uri="<gs://data_lake_mpls-311/data/pq/mpls_311data_2023.parquet>",
gcp_credentials=gcp_credentials,
job_config=job_config,
location=''
)
return result
example_bigquery_load_cloud_storage_flow()