https://prefect.io logo
g

Gustavo de Paula

07/18/2023, 1:09 PM
Hi everyone! I'm trying to use Azure Storage Block while using Azure Managed Identity on AKS but I get a "no authentication information" error. The managed identity default credential works if I try to use it in a flow. Is it necessary to explicitly pass a connection string or client secret to the storage block or it should work as well with managed identity and default credential integration?
c

Christopher Boyd

07/18/2023, 3:29 PM
Hi Gustavo - do you ahve an example? I use Azure Managed Identity like this:
I do use the azure_storage_connection_string in code here, but you can also attach the identity directly to the pod using workload identity, which should equivalently work (and is outside of Prefect)
Copy code
from prefect import task, flow
from prefect import get_run_logger
import pandas as pd
import os
from io import BytesIO

from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import blob_storage_download, blob_storage_upload

def azure_creds():
    connection_string = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
    return AzureBlobStorageCredentials(connection_string=connection_string)
    

def load_from_azure():

    blob_storage_credentials = azure_creds()
    data = blob_storage_download(
        blob="file.csv",
        container="prefect-logs",
        blob_storage_credentials=blob_storage_credentials,
    )
    return data


@task
def read_file(data):
    return pd.read_csv(BytesIO(data))


def write_df(data):
    df = pd.DataFrame(data, columns=['output'])
    csv_data = df.to_csv()
    blob = blob_storage_upload(
        data=csv_data,
        container="prefect-logs",
        blob="csv_data",
        blob_storage_credentials=azure_creds(),
        overwrite=False
    )
    return blob


@task
def transform_pd(df):
    results = [ row['col1'] * row['col2'] for index,row in df.iterrows() ]
    get_run_logger().info(f"{results=}")
    return results


@flow(log_prints=True)
def transform_flow():

    file = load_from_azure()
    df = read_file(file)
    transformed_output = transform_pd(df)
    write_df(transformed_output)

if __name__ == "__main__":
    transform_flow()
If the identity exists on the pod, you should be able to load credentials with a default azure credential which will defer to managed/ workload first -