Keith
08/05/2022, 3:51 PMprefect-gcp
to do it, but when I combine this with block information it seems like the info is not in the correct format.
gcs_block = GCS.load("gcs-dev")
@flow()
def example_cloud_storage_upload_blob_from_file_flow():
gcp_credentials = GcpCredentials(service_account_info=gcs_block.service_account_info)
test_upload_file = "test_upload.txt"
blob = cloud_storage_upload_blob_from_file(test_upload_path, gcs_block.bucket_path, "test_upload.txt", gcp_credentials)
return blob
File "pydantic/main.py", line 341, in pydantic.main.BaseModel.__init__
pydantic.error_wrappers.ValidationError: 1 validation error for GcpCredentials
service_account_info
JSON object must be str, bytes or bytearray (type=type_error.json)
Anna Geller
08/05/2022, 7:58 PMKeith
08/05/2022, 9:04 PMget_secret_value()
after the service_account_info is loaded from the block
2. Pass that object through json.dumps()
b/c the GcpCredentials
class is expecting a string.
gcs_block = GCS.load("gcs-dev")
@flow()
def example_cloud_storage_upload_blob_from_file_flow():
gcp_credentials = GcpCredentials(service_account_info=json.dumps(gcs_block.service_account_info.get_secret_value()))
test_upload_file = "test_upload.txt"
blob = cloud_storage_upload_blob_from_file(test_upload_file, gcs_block.bucket_path, "test_upload.txt", gcp_credentials)
return blob
Anna Geller
08/05/2022, 11:20 PM