https://prefect.io logo
Title
b

Bob De Schutter

05/20/2022, 2:58 PM
Hello, I'm trying to register flows that are generated dynamically based on a yaml file and that are stored in a list. In short, I'm using code like this (flows is a simple python list of Flow objects):
for flow in flows:
        flow.register(
                project_name=f"Data Platform - {os.environ.get('PREFECT_TARGET_ENV', 'Develop')}",
                add_default_labels=False,
                set_schedule_active=True if os.environ.get('PREFECT_TARGET_ENV', 'Develop') == 'Production' else False
            )
This has worked for me before but since I upgraded prefect to 1.2.1, I'm getting an error message related to the Azure storage I'm using for the flows:
azure.core.exceptions.ResourceExistsError: The specified blob already exists
Any idea why this strategy is not working anymore?
k

Kevin Kho

05/20/2022, 3:00 PM
This looks like you use Azure storage and the first registration already uploaded a file at that path and the second one will not overwrite it
b

Bob De Schutter

05/20/2022, 3:01 PM
but why is it uploading the same file twice? the different generated flows have different names..
[2022-05-20 16:54:23+0200] INFO - prefect.Azure | Uploading dbt-datawarehouse-daily/2022-05-20t14-54-22-329817-00-00 to datawarehouse-prefect-flows
Flow URL: <https://cloud.prefect.io/gv-energy-group/flow/4c3ff3d7-ac87-43f4-bfe8-ded70fb20b22>
 └── ID: c759911b-fb92-493e-90e4-1925f0663d44
 └── Project: Data Platform - Develop
 └── Labels: []
[2022-05-20 16:54:25+0200] INFO - prefect.Azure | Uploading dbt-datawarehouse-daily/2022-05-20t14-54-22-329817-00-00 to datawarehouse-prefect-flows
Traceback (most recent call last):
so the first flow that gets generated is successfully registered and uploaded, but then it seems that the second flow in the list is uploaded using the same name for some reason, while the name is different 🙂
k

Kevin Kho

05/20/2022, 3:02 PM
How did you define storage?
b

Bob De Schutter

05/20/2022, 3:03 PM
STORAGE=Azure(
    container="datawarehouse-prefect-flows",
    connection_string_secret="azure-container-connection-string"
)
and the flows are created like this in a loop:
with Flow(
        name=f"dbt-{flow['name']}",
        storage=STORAGE,
        run_config=DockerRun(image=DOCKER_IMAGE),
        schedule=schedule) as dbt_flow:
strange thing is that this exact same code was working before..
k

Kevin Kho

05/20/2022, 3:05 PM
I get what you are saying. This is hard to say though. I think the attempt is right, I would just try adding more log statements in that loop to see the flow
b

Bob De Schutter

05/20/2022, 3:08 PM
I quickly added a print statement in the loop registering the flows just before the register:
<Flow: name="dbt-datawarehouse-daily">
[2022-05-20 17:08:01+0200] INFO - prefect.Azure | Uploading dbt-datawarehouse-daily/2022-05-20t15-08-00-758148-00-00 to datawarehouse-prefect-flows
Flow URL: <https://cloud.prefect.io/gv-energy-group/flow/4c3ff3d7-ac87-43f4-bfe8-ded70fb20b22>
 └── ID: b3c85743-d98e-4fd4-9dde-f24ec1a8a0a0
 └── Project: Data Platform - Develop
 └── Labels: []
<Flow: name="dbt-smartreporting">
[2022-05-20 17:08:03+0200] INFO - prefect.Azure | Uploading dbt-datawarehouse-daily/2022-05-20t15-08-00-758148-00-00 to datawarehouse-prefect-flows
Traceback (most recent call last):
so the second name is clearly different, but prefect still tries to upload it to a blob that has the name of the previous flow for some reason 🤔
k

Kevin Kho

05/20/2022, 3:11 PM
Can you try printing
flow.storage
maybe?
b

Bob De Schutter

05/20/2022, 3:17 PM
that just gives
Storage: Azure
twice, also storage.blob_name and storage.flows are empty
ok, I found it... I was creating the storage object outside of the loop. I changed it so I create it again on every loop iteration and now it works correclty
k

Kevin Kho

05/20/2022, 3:21 PM
Ah ok glad you figured it out!
b

Bob De Schutter

05/20/2022, 3:21 PM
I didn't think about it but it's logical.. However it's strange that this worked before 🤔
thanks for your help anyway 🙂
👍 1
k

Kevin

07/07/2022, 5:57 PM
@Kevin Kho @Bob De Schutter - i'm running into a similar issue. It started randomly (i think) as it had worked a few times before. I am simply trying to register flows using globbing
prefect register -p $GITHUB_WORKSPACE/orchestrate/flows/**/* --project $PREFECT_PROJECT
The first flow registration succeeds but all following flows fail due to ErrorCode:BlobAlreadyExists - its trying to register all of the flows using the same name from the first registered flow
Collecting flows...
Processing '/home/runner/work/analytics/analytics/orchestrate/flows/salesforce/salesforce.py':
  Building `Azure` storage...
[2022-07-07 17:52:24+0000] INFO - prefect.Azure | Uploading salesforce/2022-07-07t17-52-24-299152-00-00 to prefect
  Registering 'salesforce'... Done
  └── ID: 69ac0015-cccc-43d0-9187-a1be71fc2238
  └── Version: 5
Processing '/home/runner/work/analytics/analytics/orchestrate/flows/deployment/deploy_new_source.py':
  Building `Azure` storage...
[2022-07-07 17:52:26+0000] INFO - prefect.Azure | Uploading salesforce/2022-07-07t17-52-24-299152-00-00 to prefect
    Error building storage:
      Traceback (most recent call last):
        File "/opt/hostedtoolcache/Python/3.7.13/x64/lib/python3.7/site-packages/prefect/cli/build_register.py", line 463, in build_and_register
    storage.build()
        File "/opt/hostedtoolcache/Python/3.7.13/x64/lib/python3.7/site-packages/prefect/storage/azure.py", line 155, in build
    client.upload_blob(data, overwrite=self.overwrite)
        File "/opt/hostedtoolcache/Python/3.7.13/x64/lib/python3.7/site-packages/azure/core/tracing/decorator.py", line 78, in wrapper_use_tracer
    return func(*args, **kwargs)
        File "/opt/hostedtoolcache/Python/3.7.13/x64/lib/python3.7/site-packages/azure/storage/blob/_blob_client.py", line 728, in upload_blob
    return upload_block_blob(**options)
        File "/opt/hostedtoolcache/Python/3.7.13/x64/lib/python3.7/site-packages/azure/storage/blob/_upload_helpers.py", line 177, in upload_block_blob
    process_storage_error(error)
        File "/opt/hostedtoolcache/Python/3.7.13/x64/lib/python3.7/site-packages/azure/storage/blob/_shared/response_handlers.py", line 181, in process_storage_error
    exec("raise error from None")   # pylint: disable=exec-used # nosec
        File "<string>", line 1, in <module>
        File "/opt/hostedtoolcache/Python/3.7.13/x64/lib/python3.7/site-packages/azure/storage/blob/_upload_helpers.py", line 115, in upload_block_blob
    **kwargs)
        File "/opt/hostedtoolcache/Python/3.7.13/x64/lib/python3.7/site-packages/azure/core/tracing/decorator.py", line 78, in wrapper_use_tracer
    return func(*args, **kwargs)
        File "/opt/hostedtoolcache/Python/3.7.13/x64/lib/python3.7/site-packages/azure/storage/blob/_generated/operations/_block_blob_operations.py", line 801, in upload
    map_error(status_code=response.status_code, response=response, error_map=error_map)
        File "/opt/hostedtoolcache/Python/3.7.13/x64/lib/python3.7/site-packages/azure/core/exceptions.py", line 107, in map_error
    raise error
      azure.core.exceptions.ResourceExistsError: The specified blob already exists.
RequestId:80beb2d6-701e-0067-262a-925d68000000
Time:2022-07-07T17:52:26.1641349Z
ErrorCode:BlobAlreadyExists
Content: <?xml version="1.0" encoding="utf-8"?><Error><Code>BlobAlreadyExists</Code><Message>The specified blob already exists.
RequestId:80beb2d6-701e-0067-262a-925d68000000
Time:2022-07-07T17:52:26.1641349Z</Message></Error>
k

Kevin Kho

07/07/2022, 6:32 PM
Can I see an example of how you attach sotrage?
k

Kevin

07/07/2022, 6:54 PM
I ended up looping through the files and registering them individually with no issues.
in it's own module, i have the following
from prefect.storage import Azure
from prefect.run_configs import KubernetesRun
import os

azure_store = Azure(container="prefect", connection_string_secret="AZ_CONNECTION_STRING")
And then import that into each flow and use it like
with Flow('salesforce',
        run_config=kubernetes_run,
        storage=azure_store
    ) as flow: