Hey all, wanted to post here before creating an is...
# ask-community
j
Hey all, wanted to post here before creating an issue on github. But currently having an issue getting my ci/cd registration job working when trying to update
for flow in ${flows[@]}; do prefect register flow --file flows/$flow --project my_project --skip-if-flow-metadata-unchanged; done
(which currently works) to
prefect register --path flows/ --project my_project
where my flows are utilizing azure storage and are located within flows folder. Using the new register command does create a new blob, but then errors out
Error building storage: azure.core.exceptions.ResourceExistsError: Operation returned an invalid status 'The specified blob already exists.'
for each flow ultimately exiting out the job. For now I will just continue to use ``prefect register flow``, but wanted to see if anyone was having similar issues.
z
Hey @Josh Rosenstein -- this definitely sounds like a bug; are your flows sharing a storage object? I'm curious why it'd already exist.
j
@Zanie, all flows for the project share the same container, when browsing the files, it looks like each flow has a new file around the same time stamp I ran the registration workflow.
z
Do you have the logs where it says the name the flow will be uploading to?
Copy code
<http://self.logger.info|self.logger.info>(
                "Uploading {} to {}".format(self.flows[flow_name], self.container)
            )
In theory, there will be a duplicate entry if we're seeing this error
Do they share an instance of the storage class as well? e.g.
Copy code
storage = Azure(container="foo")
flow1.storage = storage
flow2.storage = storage
or
Copy code
flow1.storage = Azure(container="foo")
flow2.storage = Azure(container="foo")
j
Yes they do share the same storage class instance. Think your on to something there, cuz i just saw that the first flow did register, but all of the ones after fail
Copy code
Processing 'flows/mapping.py':
  Building `Azure` storage...
[2021-06-22 14:57:33+0000] INFO - prefect.Azure | Uploading example-mapping/2021-06-22t14-57-33-768529-00-00 to my_blob_container
  Registering 'Example: Mapping'... Done
  └── ID: 9fc872a1-a599-4300-bcb6-161c05353b9f
  └── Version: 7
Registering 'Example: Parameters'... Error
Processing 'flows/hello-flow.py':
  Building `Azure` storage...
[2021-06-22 14:57:35+0000] INFO - prefect.Azure | Uploading example-mapping/2021-06-22t14-57-33-768529-00-00 to my_blob_container
    Error building storage:
      Traceback (most recent call last):
        File "/usr/local/lib/python3.6/dist-packages/azure/storage/blob/_upload_helpers.py", line 105, in upload_block_blob
    **kwargs)
        File "/usr/local/lib/python3.6/dist-packages/azure/storage/blob/_generated/operations/_block_blob_operations.py", line 228, in upload
    map_error(status_code=response.status_code, response=response, error_map=error_map)
        File "/usr/local/lib/python3.6/dist-packages/azure/core/exceptions.py", line 102, in map_error
    raise error
      azure.core.exceptions.ResourceExistsError: Operation returned an invalid status 'The specified blob already exists.'
      
During handling of the above exception, another exception occurred:

      Traceback (most recent call last):
        File "/usr/local/lib/python3.6/dist-packages/prefect/cli/build_register.py", line 451, in build_and_register
    storage.build()
        File "/usr/local/lib/python3.6/dist-packages/prefect/storage/azure.py", line 146, in build
    client.upload_blob(data)
        File "/usr/local/lib/python3.6/dist-packages/azure/core/tracing/decorator.py", line 83, in wrapper_use_tracer
    return func(*args, **kwargs)
        File "/usr/local/lib/python3.6/dist-packages/azure/storage/blob/_blob_client.py", line 693, in upload_blob
    return upload_block_blob(**options)
        File "/usr/local/lib/python3.6/dist-packages/azure/storage/blob/_upload_helpers.py", line 157, in upload_block_blob
    process_storage_error(error)
        File "/usr/local/lib/python3.6/dist-packages/azure/storage/blob/_shared/response_handlers.py", line 150, in process_storage_error
    error.raise_with_traceback()
        File "/usr/local/lib/python3.6/dist-packages/azure/core/exceptions.py", line 244, in raise_with_traceback
    raise super(AzureError, self).with_traceback(self.exc_traceback)
        File "/usr/local/lib/python3.6/dist-packages/azure/storage/blob/_upload_helpers.py", line 105, in upload_block_blob
    **kwargs)
        File "/usr/local/lib/python3.6/dist-packages/azure/storage/blob/_generated/operations/_block_blob_operations.py", line 228, in upload
    map_error(status_code=response.status_code, response=response, error_map=error_map)
        File "/usr/local/lib/python3.6/dist-packages/azure/core/exceptions.py", line 102, in map_error
    raise error
      azure.core.exceptions.ResourceExistsError: The specified blob already exists.
RequestId:d941d048-301e-003a-4376-67d318000000
Time:2021-06-22T14:57:35.7773979Z
ErrorCode:BlobAlreadyExists
Error:None
And i just realized in the logs, all of them are using the first flow "Example-mapping"
[2021-06-22 14:57:35+0000] INFO - prefect.Azure | Uploading example-mapping/2021-06-22t14-57-33-768529-00-00
, even though each flow does have a new file uploaded. I can try creating a new Storage class for each flow, if you think that's whats causing the issue.
z
Hm actually this is something we explicitly account for in the build/register CLI
Copy code
# Group flows by storage instance.
storage_to_flows = defaultdict(list)
for flow in flows:
    storage_to_flows[flow.storage].append(flow)

for storage, flows in storage_to_flows.items():
    # Build storage
    click.echo(f"  Building `{type(storage).__name__}` storage...")
    try:
        storage.build()
    ...
Each storage object should only be built once
In a toy example, I can confirm they're grouped correctly
Copy code
❯ cat a.py
from prefect import Flow
from base import storage

with Flow("a") as flow:
    pass

flow.storage = storage


❯ cat b.py
from prefect import Flow
from base import storage

with Flow("b") as flow:
    pass

flow.storage = storage

❯ cat base.py
from prefect.storage import Azure

storage = Azure("foo")
It seems likely that there's something going on with the import of your 'Azure' storage object that is making it fail to match during the grouping process I shared
j
hmm the only difference from what I have is that i have a mix of where i assign the the storage:
Copy code
from utils.config import azure_store, docker_run

with Flow("Example: Mapping" , run_config=docker_run ,storage=azure_store) as flow:
    pass
Copy code
from utils.config import azure_store, docker_run

with Flow("Example: Parameters") as flow:
    pass

flow.run_config= docker_run
flow.storage=azure_store
z
Hm alright. I'm convinced this is a bug on our end again. I'll look into it a bit more and get back to you.
j
10-4, im recreating your example you showed to see if it passes, will let you know
z
I think our grouping by storage instance is only happening per file
So if you share storage instances across multiple files you'll get the error you're seeing
Unfortunately I can't think of a way to fix it that doesn't feel like a hack yet
For now I'd recommend using a separate storage instance for each flow and just import the shared container name from your utils, I'll continue to explore a good way to fix this.
@Marvin open "`prefect register -p .` duplicates
storage.build()
calls for storage instances shared across files"
j
Got the same results from your basic flow examples. I guess i was wrong earlier when i thought all the flows still got uploaded. where now i just see a in foo and not b.
Copy code
prefect register --path flows/ --project prefect2
  shell: /bin/bash -e {0}
  env:
    ACR: <http://tprefect.azurecr.io|tprefect.azurecr.io>
    PROJECT: prefect2
    AZURE_STORAGE_CONNECTION_STRING: ***
    IMAGE_URL: <http://tprefect.azurecr.io/prefect2:latest|tprefect.azurecr.io/prefect2:latest>
a.py b.py
prefect2 already exists
Collecting flows...
Processing 'flows/a.py':
  Building `Azure` storage...
[2021-06-22 16:45:03+0000] INFO - prefect.Azure | Uploading a/2021-06-22t16-45-03-374828-00-00 to foo
  Registering 'a'... Done
  └── ID: 0c7cbbd9-38e8-4786-9125-29b88c81409e
  └── Version: 1
Processing 'flows/b.py':
  Building `Azure` storage...
[2021-06-22 16:45:04+0000] INFO - prefect.Azure | Uploading a/2021-06-22t16-45-03-374828-00-00 to foo
    Error building storage:
      Traceback (most recent call last):
        File "/usr/local/lib/python3.6/dist-packages/azure/storage/blob/_upload_helpers.py", line 105, in upload_block_blob
    **kwargs)
        File "/usr/local/lib/python3.6/dist-packages/azure/storage/blob/_generated/operations/_block_blob_operations.py", line 228, in upload
    map_error(status_code=response.status_code, response=response, error_map=error_map)
        File "/usr/local/lib/python3.6/dist-packages/azure/core/exceptions.py", line 102, in map_error
    raise error
      azure.core.exceptions.ResourceExistsError: Operation returned an invalid status 'The specified blob already exists.'
      
During handling of the above exception, another exception occurred:

      Traceback (most recent call last):
        File "/usr/local/lib/python3.6/dist-packages/prefect/cli/build_register.py", line 451, in build_and_register
    storage.build()
        File "/usr/local/lib/python3.6/dist-packages/prefect/storage/azure.py", line 146, in build
    client.upload_blob(data)
        File "/usr/local/lib/python3.6/dist-packages/azure/core/tracing/decorator.py", line 83, in wrapper_use_tracer
    return func(*args, **kwargs)
        File "/usr/local/lib/python3.6/dist-packages/azure/storage/blob/_blob_client.py", line 693, in upload_blob
    return upload_block_blob(**options)
        File "/usr/local/lib/python3.6/dist-packages/azure/storage/blob/_upload_helpers.py", line 157, in upload_block_blob
    process_storage_error(error)
        File "/usr/local/lib/python3.6/dist-packages/azure/storage/blob/_shared/response_handlers.py", line 150, in process_storage_error
    error.raise_with_traceback()
        File "/usr/local/lib/python3.6/dist-packages/azure/core/exceptions.py", line 244, in raise_with_traceback
    raise super(AzureError, self).with_traceback(self.exc_traceback)
        File "/usr/local/lib/python3.6/dist-packages/azure/storage/blob/_upload_helpers.py", line 105, in upload_block_blob
    **kwargs)
        File "/usr/local/lib/python3.6/dist-packages/azure/storage/blob/_generated/operations/_block_blob_operations.py", line 228, in upload
    map_error(status_code=response.status_code, response=response, error_map=error_map)
        File "/usr/local/lib/python3.6/dist-packages/azure/core/exceptions.py", line 102, in map_error
    raise error
      azure.core.exceptions.ResourceExistsError: The specified blob already exists.
RequestId:f28d19fc-c01e-0060-3d85-67d599000000
Time:2021-06-22T16:45:04.9856152Z
ErrorCode:BlobAlreadyExists
Error:None

  Registering 'b'... Error
================== 1 registered, 1 errored ==================
10-4 thanks @Zanie
@Zanie just made it a function to create a new instance and it now works.
Copy code
def azure_store():
    return Azure(container="foo")
Copy code
from prefect import Flow

from utils.config import azure_store, docker_run

with Flow("a") as flow:
    pass

flow.run_config = docker_run
flow.storage = azure_store()
z
Glad that works 👍 sorry about the hitch there
j
No worries, thanks for your help on this