https://prefect.io logo
m

Mike Cole

07/14/2023, 12:40 AM
I'm hoping someone might have some ideas on how to trouleshoot an issue I'm having with prefect-gcp, specifically the bigquery_load_cloud_storage function. I'm trying to load some files I have in a GoogleCloudStorage bucket, into a BigQuery dataset with this function. I've used some example scripts from the prefect-gcp docs and am getting some mixed results in them finding my BQ dataset. The script below, for loading a file from a GCS bucket to BQ, fails with a 404 error about not being able to find my BQ dataset:
Copy code
from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.bigquery import bigquery_load_cloud_storage

@flow
def example_bigquery_load_cloud_storage_flow():
    gcp_credentials = GcpCredentials(project="mpls-311")
    result = bigquery_load_cloud_storage(
        dataset="mpls_311_staging",
        table="stg_mpls_311data",
        uri="<gs://data_lake_mpls-311/data/pq/mpls_311data_2023.parquet>",
        gcp_credentials=gcp_credentials
    )
    return result

example_bigquery_load_cloud_storage_flow()
The script below, for creating a table in my BQ dataset using bigquery_create_table successfully finds my dataset, and builds a table in it. As far as I can see, I have it set up the same, related to identifying my GCP Project and BQ Dataset.
Copy code
from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.bigquery import bigquery_create_table
from google.cloud.bigquery import SchemaField

@flow
def example_bigquery_create_table_flow():
    gcp_credentials = GcpCredentials(project="mpls-311")
    schema = [
        SchemaField("number", field_type="INTEGER", mode="REQUIRED"),
        SchemaField("text", field_type="STRING", mode="REQUIRED"),
        SchemaField("bool", field_type="BOOLEAN")
    ]
    result = bigquery_create_table(
        dataset="mpls_311_staging",
        table="test_table",
        schema=schema,
        gcp_credentials=gcp_credentials
    )
    return result
example_bigquery_create_table_flow()
I'm stumped. Anyone have any ideas on what I might be doing wrong? I'll include the full traceback in the thread.
1
The traceback error I get for the first script, using bigquery_load_cloud_storage:
Copy code
13:40:30.518 | ERROR   | Flow run 'foamy-dog' - Finished in state Failed('Flow run encountered an exception. google.api_core.exceptions.NotFound: 404 POST <https://bigquery.googleapis.com/bigquery/v2/projects/mpls-311/jobs?prettyPrint=false>: Not found: Dataset mpls-311:mpls_311_staging\n')
Traceback (most recent call last):
  File "/home/mike/git/mpls-311-data/flows/bq_test2.py", line 16, in <module>
    example_bigquery_load_cloud_storage_flow()
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect/flows.py", line 448, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect/engine.py", line 164, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 68, in run
    return asynclib.run(func, *args, **backend_options)
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 204, in run
    return native_run(wrapper(), debug=debug)
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 199, in wrapper
    return await func(*args)
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect/engine.py", line 244, in create_then_begin_flow_run
    return await state.result(fetch=True)
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect/states.py", line 89, in _get_state_result
    raise await get_state_exception(state)
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect/engine.py", line 636, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 156, in run_sync_in_interruptible_worker_thread
    tg.start_soon(
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 597, in __aexit__
    raise exceptions[0]
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/anyio/to_thread.py", line 33, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 877, in run_sync_in_worker_thread
    return await future
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 807, in run
    result = context.run(func, *args)
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 135, in capture_worker_thread_and_result
    result = __fn(*args, **kwargs)
  File "/home/mike/git/mpls-311-data/flows/bq_test2.py", line 8, in example_bigquery_load_cloud_storage_flow
    result = bigquery_load_cloud_storage(
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect/tasks.py", line 436, in __call__
    return enter_task_run_engine(
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect/engine.py", line 926, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 177, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/anyio/from_thread.py", line 47, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 906, in run_async_from_thread
    return f.result()
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect/engine.py", line 1064, in get_task_call_return_value
    return await future._result()
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect/states.py", line 89, in _get_state_result
    raise await get_state_exception(state)
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect/engine.py", line 1438, in orchestrate_task_run
    result = await task.fn(*args, **kwargs)
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect_gcp/bigquery.py", line 406, in bigquery_load_cloud_storage
    result = await to_thread.run_sync(partial_load)
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/anyio/to_thread.py", line 33, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 877, in run_sync_in_worker_thread
    return await future
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 807, in run
    result = context.run(func, *args)
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/prefect_gcp/bigquery.py", line 40, in _result_sync
    result = func(*args, **kwargs).result()
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/google/cloud/bigquery/client.py", line 2375, in load_table_from_uri
    load_job._begin(retry=retry, timeout=timeout)
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/google/cloud/bigquery/job/base.py", line 693, in _begin
    api_response = client._call_api(
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/google/cloud/bigquery/client.py", line 813, in _call_api
    return call()
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/google/api_core/retry.py", line 349, in retry_wrapped_func
    return retry_target(
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/google/api_core/retry.py", line 191, in retry_target
    return target()
  File "/home/mike/anaconda3/envs/mpls311/lib/python3.9/site-packages/google/cloud/_http/__init__.py", line 494, in api_request
    raise exceptions.from_http_response(response)
google.api_core.exceptions.NotFound: 404 POST <https://bigquery.googleapis.com/bigquery/v2/projects/mpls-311/jobs?prettyPrint=false>: Not found: Dataset mpls-311:mpls_311_staging
Figured this out after more troubleshooting and including what I've found, if it helps others later. I tracked the issue down to locations in GCP. My existing datasets are in the us-central1 region. When I created a new test database in location for US, the bigquery_load_cloud_storage could successfully find the dataset. Looks like the prefect-gcp functions default to Location=US, so I'm still not sure how bigquery_create_table could create a table in my us-central1 region, but not the bigquery_load_cloud_storage function.
If I call the bigquery_load_cloud_storage function with a blank parameter for Location, it ends up finding my dataset in us-central1 and appends data from a file in GCS to the BQ dataset/table. Script below for reference that works:
Copy code
from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.bigquery import bigquery_load_cloud_storage

@flow
def example_bigquery_load_cloud_storage_flow():
    gcp_credentials = GcpCredentials(project="mpls-311")

    job_config = {'write_disposition': 'WRITE_APPEND',
                  'source_format': 'PARQUET'}

    result = bigquery_load_cloud_storage(
        dataset="mpls_311_staging",
        table="stg_mpls_311data",
        uri="<gs://data_lake_mpls-311/data/pq/mpls_311data_2023.parquet>",
        gcp_credentials=gcp_credentials,
        job_config=job_config,
        location=''
    )
    return result

example_bigquery_load_cloud_storage_flow()
2 Views