matt_innerspace.io
07/07/2023, 8:15 PMazure storage account
- i can't figure out how to properly connect to an azure blobstore. From what I can tell, I need this to be able to deploy code (which exists in bitbucket) to a remote worker pool.
I've entered enough information to make it work, but it fails:
raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Client error '401 Unauthorized' for url '<https://api.prefect.cloud/api/accounts/><my account>/workspaces/<my workspace>/block_types/slug/azure/block_documents/name/azure-block-001?include_secrets=true'
Response: {'detail': 'Invalid authentication credentials'}
For more information check: <https://httpstatuses.com/401>
Is there a document somewhere that describes how to do this?Christopher Boyd
07/07/2023, 8:22 PMmatt_innerspace.io
07/07/2023, 8:29 PMAzure
block here. Is what you mentioned above different than that?
then trying to deploy it:
prefect deployment build --storage-block azure/azure-block-001/health_check --name health-test --pool default-agent-pool --work-queue aci-test --apply sample/health_flow.py:health_check_flow
Christopher Boyd
07/07/2023, 8:37 PMpip install prefect-azure
and pip install prefect-azure[blob-storage]
prefect block register -m prefect_azure
Then they will be available in the UI for you to configurematt_innerspace.io
07/07/2023, 8:40 PM(.venv) mattm ~/dev/workflows$ prefect block register -m prefect_azure
Warning! Failed to load collection 'prefect_azure': ModuleNotFoundError: No module named 'prefect.workers'
Unable to load prefect_azure. Please make sure the module is installed in your current environment.
Christopher Boyd
07/07/2023, 8:41 PMmatt_innerspace.io
07/07/2023, 8:42 PMWARNING: prefect-azure 0.2.7 does not provide the extra 'blob-storage'
Christopher Boyd
07/07/2023, 8:43 PMmatt_innerspace.io
07/07/2023, 8:44 PMWARNING: prefect-azure 0.2.10 does not provide the extra 'blob-storage'
pip install prefect-azure[blob-storage]
Christopher Boyd
07/07/2023, 8:44 PMmatt_innerspace.io
07/07/2023, 8:46 PM(.venv) mattm ~/dev/workflows$ prefect block register -m prefect_azure
Warning! Failed to load collection 'prefect_azure': ModuleNotFoundError: No module named 'prefect.workers'
Unable to load prefect_azure. Please make sure the module is installed in your current environment.
Christopher Boyd
07/07/2023, 8:46 PMmatt_innerspace.io
07/07/2023, 8:46 PM(.venv) mattm ~/dev/workflows$ prefect --version
Warning! Failed to load collection 'prefect_azure': ModuleNotFoundError: No module named 'prefect.workers'
2.82
Christopher Boyd
07/07/2023, 8:48 PMmatt_innerspace.io
07/07/2023, 8:49 PMChristopher Boyd
07/07/2023, 8:49 PMfrom prefect import task, flow
from prefect import get_run_logger
import pandas as pd
import os
from io import BytesIO
from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import blob_storage_download, blob_storage_upload
def azure_creds():
try:
azure_credentials_block = AzureBlobStorageCredentials.load("boydoblobbo")
return azure_credentials_block
except ValueError as e:
get_run_logger().info(f"No azure_credentials_block found :{e}")
try:
connection_string = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
return AzureBlobStorageCredentials(connection_string=connection_string)
except Exception as f:
get_run_logger().info("No connection string found")
connection_string = None
raise
def load_from_azure():
blob_storage_credentials = azure_creds()
data = blob_storage_download(
blob="file.csv",
container="prefect-logs",
blob_storage_credentials=blob_storage_credentials,
)
return data
@task
def read_file(data):
return pd.read_csv(BytesIO(data))
def write_df(data):
df = pd.DataFrame(data, columns=["output"])
csv_data = df.to_csv()
blob = blob_storage_upload(
data=csv_data,
container="prefect-logs",
blob="csv_data",
blob_storage_credentials=azure_creds(),
overwrite=True,
)
return blob
@task
def transform_pd(df):
results = [row["col1"] * row["col2"] for index, row in df.iterrows()]
get_run_logger().info(f"{results=}")
return results
@flow(log_prints=True)
def transform_flow():
file = load_from_azure()
df = read_file(file)
transformed_output = transform_pd(df)
write_df(transformed_output)
if __name__ == "__main__":
transform_flow()
matt_innerspace.io
07/07/2023, 8:52 PMprefect deployment build --storage-block azure/azure-block-001/health_check --name health-test --pool default-agent-pool --work-queue aci-test --apply health_flow.py:health_check_flow
Is this not the right way to do it?Christopher Boyd
07/07/2023, 8:54 PMmatt_innerspace.io
07/07/2023, 8:56 PMChristopher Boyd
07/07/2023, 8:56 PMprefect init
matt_innerspace.io
07/07/2023, 8:58 PMChristopher Boyd
07/07/2023, 8:58 PMprefect init
will allow you to choose from an interactive process.
Since you’re using azure, it will setup your prefect.yaml
for you
You’ll likely still need to create an AzureBlobStorageCredential block which should just require a connection string
Then the command would just be prefect deploy
and it will walk you interactively through the process of your deployment - you can certainly pass them all via cli if you want and know them, but this makes it much simpler to followmatt_innerspace.io
07/07/2023, 9:04 PMChristopher Boyd
07/07/2023, 9:05 PMmatt_innerspace.io
07/07/2023, 9:06 PMChristopher Boyd
07/07/2023, 9:09 PMprefect deploy
provide your flow, and it will go to the storage you selected and pull from the storage you selectedmatt_innerspace.io
07/10/2023, 6:31 PMChristopher Boyd
07/10/2023, 6:38 PMmatt_innerspace.io
07/10/2023, 6:56 PM