Bob De Schutter
05/20/2022, 2:58 PMfor flow in flows:
flow.register(
project_name=f"Data Platform - {os.environ.get('PREFECT_TARGET_ENV', 'Develop')}",
add_default_labels=False,
set_schedule_active=True if os.environ.get('PREFECT_TARGET_ENV', 'Develop') == 'Production' else False
)
This has worked for me before but since I upgraded prefect to 1.2.1, I'm getting an error message related to the Azure storage I'm using for the flows:
azure.core.exceptions.ResourceExistsError: The specified blob already exists
Any idea why this strategy is not working anymore?Kevin Kho
05/20/2022, 3:00 PMBob De Schutter
05/20/2022, 3:01 PM[2022-05-20 16:54:23+0200] INFO - prefect.Azure | Uploading dbt-datawarehouse-daily/2022-05-20t14-54-22-329817-00-00 to datawarehouse-prefect-flows
Flow URL: <https://cloud.prefect.io/gv-energy-group/flow/4c3ff3d7-ac87-43f4-bfe8-ded70fb20b22>
└── ID: c759911b-fb92-493e-90e4-1925f0663d44
└── Project: Data Platform - Develop
└── Labels: []
[2022-05-20 16:54:25+0200] INFO - prefect.Azure | Uploading dbt-datawarehouse-daily/2022-05-20t14-54-22-329817-00-00 to datawarehouse-prefect-flows
Traceback (most recent call last):
Kevin Kho
05/20/2022, 3:02 PMBob De Schutter
05/20/2022, 3:03 PMSTORAGE=Azure(
container="datawarehouse-prefect-flows",
connection_string_secret="azure-container-connection-string"
)
with Flow(
name=f"dbt-{flow['name']}",
storage=STORAGE,
run_config=DockerRun(image=DOCKER_IMAGE),
schedule=schedule) as dbt_flow:
Kevin Kho
05/20/2022, 3:05 PMBob De Schutter
05/20/2022, 3:08 PM<Flow: name="dbt-datawarehouse-daily">
[2022-05-20 17:08:01+0200] INFO - prefect.Azure | Uploading dbt-datawarehouse-daily/2022-05-20t15-08-00-758148-00-00 to datawarehouse-prefect-flows
Flow URL: <https://cloud.prefect.io/gv-energy-group/flow/4c3ff3d7-ac87-43f4-bfe8-ded70fb20b22>
└── ID: b3c85743-d98e-4fd4-9dde-f24ec1a8a0a0
└── Project: Data Platform - Develop
└── Labels: []
<Flow: name="dbt-smartreporting">
[2022-05-20 17:08:03+0200] INFO - prefect.Azure | Uploading dbt-datawarehouse-daily/2022-05-20t15-08-00-758148-00-00 to datawarehouse-prefect-flows
Traceback (most recent call last):
Kevin Kho
05/20/2022, 3:11 PMflow.storage
maybe?Bob De Schutter
05/20/2022, 3:17 PMStorage: Azure
twice, also storage.blob_name and storage.flows are emptyKevin Kho
05/20/2022, 3:21 PMBob De Schutter
05/20/2022, 3:21 PMKevin
07/07/2022, 5:57 PMprefect register -p $GITHUB_WORKSPACE/orchestrate/flows/**/* --project $PREFECT_PROJECT
Collecting flows...
Processing '/home/runner/work/analytics/analytics/orchestrate/flows/salesforce/salesforce.py':
Building `Azure` storage...
[2022-07-07 17:52:24+0000] INFO - prefect.Azure | Uploading salesforce/2022-07-07t17-52-24-299152-00-00 to prefect
Registering 'salesforce'... Done
└── ID: 69ac0015-cccc-43d0-9187-a1be71fc2238
└── Version: 5
Processing '/home/runner/work/analytics/analytics/orchestrate/flows/deployment/deploy_new_source.py':
Building `Azure` storage...
[2022-07-07 17:52:26+0000] INFO - prefect.Azure | Uploading salesforce/2022-07-07t17-52-24-299152-00-00 to prefect
Error building storage:
Traceback (most recent call last):
File "/opt/hostedtoolcache/Python/3.7.13/x64/lib/python3.7/site-packages/prefect/cli/build_register.py", line 463, in build_and_register
storage.build()
File "/opt/hostedtoolcache/Python/3.7.13/x64/lib/python3.7/site-packages/prefect/storage/azure.py", line 155, in build
client.upload_blob(data, overwrite=self.overwrite)
File "/opt/hostedtoolcache/Python/3.7.13/x64/lib/python3.7/site-packages/azure/core/tracing/decorator.py", line 78, in wrapper_use_tracer
return func(*args, **kwargs)
File "/opt/hostedtoolcache/Python/3.7.13/x64/lib/python3.7/site-packages/azure/storage/blob/_blob_client.py", line 728, in upload_blob
return upload_block_blob(**options)
File "/opt/hostedtoolcache/Python/3.7.13/x64/lib/python3.7/site-packages/azure/storage/blob/_upload_helpers.py", line 177, in upload_block_blob
process_storage_error(error)
File "/opt/hostedtoolcache/Python/3.7.13/x64/lib/python3.7/site-packages/azure/storage/blob/_shared/response_handlers.py", line 181, in process_storage_error
exec("raise error from None") # pylint: disable=exec-used # nosec
File "<string>", line 1, in <module>
File "/opt/hostedtoolcache/Python/3.7.13/x64/lib/python3.7/site-packages/azure/storage/blob/_upload_helpers.py", line 115, in upload_block_blob
**kwargs)
File "/opt/hostedtoolcache/Python/3.7.13/x64/lib/python3.7/site-packages/azure/core/tracing/decorator.py", line 78, in wrapper_use_tracer
return func(*args, **kwargs)
File "/opt/hostedtoolcache/Python/3.7.13/x64/lib/python3.7/site-packages/azure/storage/blob/_generated/operations/_block_blob_operations.py", line 801, in upload
map_error(status_code=response.status_code, response=response, error_map=error_map)
File "/opt/hostedtoolcache/Python/3.7.13/x64/lib/python3.7/site-packages/azure/core/exceptions.py", line 107, in map_error
raise error
azure.core.exceptions.ResourceExistsError: The specified blob already exists.
RequestId:80beb2d6-701e-0067-262a-925d68000000
Time:2022-07-07T17:52:26.1641349Z
ErrorCode:BlobAlreadyExists
Content: <?xml version="1.0" encoding="utf-8"?><Error><Code>BlobAlreadyExists</Code><Message>The specified blob already exists.
RequestId:80beb2d6-701e-0067-262a-925d68000000
Time:2022-07-07T17:52:26.1641349Z</Message></Error>
Kevin Kho
07/07/2022, 6:32 PMKevin
07/07/2022, 6:54 PMfrom prefect.storage import Azure
from prefect.run_configs import KubernetesRun
import os
azure_store = Azure(container="prefect", connection_string_secret="AZ_CONNECTION_STRING")
with Flow('salesforce',
run_config=kubernetes_run,
storage=azure_store
) as flow: