Josh Rosenstein
06/22/2021, 3:42 PMfor flow in ${flows[@]}; do prefect register flow --file flows/$flow --project my_project --skip-if-flow-metadata-unchanged; done
(which currently works) to prefect register --path flows/ --project my_project
where my flows are utilizing azure storage and are located within flows folder. Using the new register command does create a new blob, but then errors out Error building storage: azure.core.exceptions.ResourceExistsError: Operation returned an invalid status 'The specified blob already exists.'
for each flow ultimately exiting out the job. For now I will just continue to use ``prefect register flow``, but wanted to see if anyone was having similar issues.Zanie
Josh Rosenstein
06/22/2021, 3:58 PMZanie
<http://self.logger.info|self.logger.info>(
"Uploading {} to {}".format(self.flows[flow_name], self.container)
)
Zanie
Zanie
storage = Azure(container="foo")
flow1.storage = storage
flow2.storage = storage
or
flow1.storage = Azure(container="foo")
flow2.storage = Azure(container="foo")
Josh Rosenstein
06/22/2021, 4:03 PMProcessing 'flows/mapping.py':
Building `Azure` storage...
[2021-06-22 14:57:33+0000] INFO - prefect.Azure | Uploading example-mapping/2021-06-22t14-57-33-768529-00-00 to my_blob_container
Registering 'Example: Mapping'... Done
└── ID: 9fc872a1-a599-4300-bcb6-161c05353b9f
└── Version: 7
Registering 'Example: Parameters'... Error
Processing 'flows/hello-flow.py':
Building `Azure` storage...
[2021-06-22 14:57:35+0000] INFO - prefect.Azure | Uploading example-mapping/2021-06-22t14-57-33-768529-00-00 to my_blob_container
Error building storage:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/azure/storage/blob/_upload_helpers.py", line 105, in upload_block_blob
**kwargs)
File "/usr/local/lib/python3.6/dist-packages/azure/storage/blob/_generated/operations/_block_blob_operations.py", line 228, in upload
map_error(status_code=response.status_code, response=response, error_map=error_map)
File "/usr/local/lib/python3.6/dist-packages/azure/core/exceptions.py", line 102, in map_error
raise error
azure.core.exceptions.ResourceExistsError: Operation returned an invalid status 'The specified blob already exists.'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/prefect/cli/build_register.py", line 451, in build_and_register
storage.build()
File "/usr/local/lib/python3.6/dist-packages/prefect/storage/azure.py", line 146, in build
client.upload_blob(data)
File "/usr/local/lib/python3.6/dist-packages/azure/core/tracing/decorator.py", line 83, in wrapper_use_tracer
return func(*args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/azure/storage/blob/_blob_client.py", line 693, in upload_blob
return upload_block_blob(**options)
File "/usr/local/lib/python3.6/dist-packages/azure/storage/blob/_upload_helpers.py", line 157, in upload_block_blob
process_storage_error(error)
File "/usr/local/lib/python3.6/dist-packages/azure/storage/blob/_shared/response_handlers.py", line 150, in process_storage_error
error.raise_with_traceback()
File "/usr/local/lib/python3.6/dist-packages/azure/core/exceptions.py", line 244, in raise_with_traceback
raise super(AzureError, self).with_traceback(self.exc_traceback)
File "/usr/local/lib/python3.6/dist-packages/azure/storage/blob/_upload_helpers.py", line 105, in upload_block_blob
**kwargs)
File "/usr/local/lib/python3.6/dist-packages/azure/storage/blob/_generated/operations/_block_blob_operations.py", line 228, in upload
map_error(status_code=response.status_code, response=response, error_map=error_map)
File "/usr/local/lib/python3.6/dist-packages/azure/core/exceptions.py", line 102, in map_error
raise error
azure.core.exceptions.ResourceExistsError: The specified blob already exists.
RequestId:d941d048-301e-003a-4376-67d318000000
Time:2021-06-22T14:57:35.7773979Z
ErrorCode:BlobAlreadyExists
Error:None
Josh Rosenstein
06/22/2021, 4:12 PM[2021-06-22 14:57:35+0000] INFO - prefect.Azure | Uploading example-mapping/2021-06-22t14-57-33-768529-00-00
, even though each flow does have a new file uploaded.
I can try creating a new Storage class for each flow, if you think that's whats causing the issue.Zanie
Zanie
# Group flows by storage instance.
storage_to_flows = defaultdict(list)
for flow in flows:
storage_to_flows[flow.storage].append(flow)
for storage, flows in storage_to_flows.items():
# Build storage
click.echo(f" Building `{type(storage).__name__}` storage...")
try:
storage.build()
...
Zanie
Zanie
Zanie
❯ cat a.py
from prefect import Flow
from base import storage
with Flow("a") as flow:
pass
flow.storage = storage
❯ cat b.py
from prefect import Flow
from base import storage
with Flow("b") as flow:
pass
flow.storage = storage
❯ cat base.py
from prefect.storage import Azure
storage = Azure("foo")
Zanie
Josh Rosenstein
06/22/2021, 4:25 PMfrom utils.config import azure_store, docker_run
with Flow("Example: Mapping" , run_config=docker_run ,storage=azure_store) as flow:
pass
from utils.config import azure_store, docker_run
with Flow("Example: Parameters") as flow:
pass
flow.run_config= docker_run
flow.storage=azure_store
Zanie
Josh Rosenstein
06/22/2021, 4:41 PMZanie
Zanie
Zanie
Zanie
Zanie
storage.build()
calls for storage instances shared across files"Marvin
06/22/2021, 4:49 PMJosh Rosenstein
06/22/2021, 4:49 PMprefect register --path flows/ --project prefect2
shell: /bin/bash -e {0}
env:
ACR: <http://tprefect.azurecr.io|tprefect.azurecr.io>
PROJECT: prefect2
AZURE_STORAGE_CONNECTION_STRING: ***
IMAGE_URL: <http://tprefect.azurecr.io/prefect2:latest|tprefect.azurecr.io/prefect2:latest>
a.py b.py
prefect2 already exists
Collecting flows...
Processing 'flows/a.py':
Building `Azure` storage...
[2021-06-22 16:45:03+0000] INFO - prefect.Azure | Uploading a/2021-06-22t16-45-03-374828-00-00 to foo
Registering 'a'... Done
└── ID: 0c7cbbd9-38e8-4786-9125-29b88c81409e
└── Version: 1
Processing 'flows/b.py':
Building `Azure` storage...
[2021-06-22 16:45:04+0000] INFO - prefect.Azure | Uploading a/2021-06-22t16-45-03-374828-00-00 to foo
Error building storage:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/azure/storage/blob/_upload_helpers.py", line 105, in upload_block_blob
**kwargs)
File "/usr/local/lib/python3.6/dist-packages/azure/storage/blob/_generated/operations/_block_blob_operations.py", line 228, in upload
map_error(status_code=response.status_code, response=response, error_map=error_map)
File "/usr/local/lib/python3.6/dist-packages/azure/core/exceptions.py", line 102, in map_error
raise error
azure.core.exceptions.ResourceExistsError: Operation returned an invalid status 'The specified blob already exists.'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/prefect/cli/build_register.py", line 451, in build_and_register
storage.build()
File "/usr/local/lib/python3.6/dist-packages/prefect/storage/azure.py", line 146, in build
client.upload_blob(data)
File "/usr/local/lib/python3.6/dist-packages/azure/core/tracing/decorator.py", line 83, in wrapper_use_tracer
return func(*args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/azure/storage/blob/_blob_client.py", line 693, in upload_blob
return upload_block_blob(**options)
File "/usr/local/lib/python3.6/dist-packages/azure/storage/blob/_upload_helpers.py", line 157, in upload_block_blob
process_storage_error(error)
File "/usr/local/lib/python3.6/dist-packages/azure/storage/blob/_shared/response_handlers.py", line 150, in process_storage_error
error.raise_with_traceback()
File "/usr/local/lib/python3.6/dist-packages/azure/core/exceptions.py", line 244, in raise_with_traceback
raise super(AzureError, self).with_traceback(self.exc_traceback)
File "/usr/local/lib/python3.6/dist-packages/azure/storage/blob/_upload_helpers.py", line 105, in upload_block_blob
**kwargs)
File "/usr/local/lib/python3.6/dist-packages/azure/storage/blob/_generated/operations/_block_blob_operations.py", line 228, in upload
map_error(status_code=response.status_code, response=response, error_map=error_map)
File "/usr/local/lib/python3.6/dist-packages/azure/core/exceptions.py", line 102, in map_error
raise error
azure.core.exceptions.ResourceExistsError: The specified blob already exists.
RequestId:f28d19fc-c01e-0060-3d85-67d599000000
Time:2021-06-22T16:45:04.9856152Z
ErrorCode:BlobAlreadyExists
Error:None
Registering 'b'... Error
================== 1 registered, 1 errored ==================
Josh Rosenstein
06/22/2021, 4:50 PMJosh Rosenstein
06/22/2021, 4:55 PMdef azure_store():
return Azure(container="foo")
from prefect import Flow
from utils.config import azure_store, docker_run
with Flow("a") as flow:
pass
flow.run_config = docker_run
flow.storage = azure_store()
Zanie
Josh Rosenstein
06/22/2021, 4:56 PM