matta
07/28/2021, 3:11 AMBigQueryLoadGoogleCloudStorage
seems to not be inferring my GCP credentials? Not sure how to pass it, either (I can grab it as a PrefectSecret
fine, and can pass it directly to GCP functions)Kevin Kho
GCP_CREDENTIALS
? What is the error you get? Maybe your Flow is looking for local secrets?matta
07/28/2021, 3:19 AMGCP_CREDENTIALS
matta
07/28/2021, 3:21 AMGCPSecret
task instead of PrefectSecret("GCP_CREDENTIALS")
?Kevin Kho
matta
07/28/2021, 3:27 AMmatta
07/28/2021, 3:51 AM@task
def storage_to_bigquery(
secret: PrefectSecret, project: str, dataset_name: str, table: str, file_uri: str
) -> str:
full_dataset_id = f"{project}.{dataset_name}"
credentials = service_account.Credentials.from_service_account_info(secret)
bigquery_client = bigquery.Client(credentials=credentials)
dataset_ref = bigquery.dataset.Dataset.from_string(full_dataset_id)
job_config = bigquery.LoadJobConfig(
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
autodetect=True,
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
)
load_job = bigquery_client.load_table_from_uri(
file_uri, dataset_ref.table(table), job_config=job_config,
)
print(f"Starting job {load_job.job_id}")
load_job.result() # Waits for table load to complete.
return "Done"