Samuel Kohlleffel
07/06/2021, 9:07 PMBlobStorageUpload
is there anyway to set overwrite=True
so the files in the container are overwritten with the new uploaded files when my flow runs?Kevin Kho
Kevin Kho
import uuid
import azure.storage.blob
from prefect import Task
from prefect.client import Secret
from prefect.utilities.tasks import defaults_from_attrs
class MyBlobStorageUpload(Task):
def __init__(
self,
azure_credentials_secret: str = "AZ_CONNECTION_STRING",
container: str = None,
**kwargs
) -> None:
self.azure_credentials_secret = azure_credentials_secret
self.container = container
super().__init__(**kwargs)
@defaults_from_attrs("azure_credentials_secret", "container")
def run(
self,
data: str,
blob_name: str = None,
azure_credentials_secret: str = "AZ_CONNECTION_STRING",
container: str = None,
overwrite: bool = False
) -> str:
if container is None:
raise ValueError("A container name must be provided.")
# get Azure credentials
azure_credentials = Secret(azure_credentials_secret).get()
blob_service = azure.storage.blob.BlobServiceClient.from_connection_string(
conn_str=azure_credentials
)
# create key if not provided
if blob_name is None:
blob_name = str(uuid.uuid4())
client = blob_service.get_blob_client(container=container, blob=blob_name)
client.upload_blob(data, overwrite = overwrite)
return blob_name
Samuel Kohlleffel
07/07/2021, 12:23 PM