Brian Newman
01/18/2024, 4:10 PMBrian Newman
01/18/2024, 4:11 PM@task(
cache_key_fn=cache_with_storage,
result_storage=Azure.load("stunifiedintegrations-prefect-cache"),
retries=3,
cache_expiration=timedelta(days=1),
task_run_name=generate_task_name,
)
async def get_delta_table_storage_options() -> dict[str, str]:
"""
Asynchronously get storage options for a DeltaTable from a storage account.
Returns:
Dict[str, str]: A dictionary containing storage options.
"""
data_lake_account = None
access_key = None
if settings.ENVIRONMENT == "dev":
data_lake_account = PrefectAzureBlobStorageCredentials.dev
access_key = await fetch_secret.fn(
key=PrefectSecrets.DEV_ACCOUNT_KEY
)
if settings.ENVIRONMENT == "prod":
data_lake_account = PrefectAzureBlobStorageCredentials.prod
access_key = await fetch_secret.fn(
key=PrefectSecrets.PROD_ACCOUNT_KEY
)
if data_lake_account is None or access_key is None:
raise ValueError(
"Unable to get storage account credentials. Check that the environment is set correctly."
)
return {
"azure_storage_account_name": data_lake_account.value,
"azure_storage_account_key": access_key,
}
@flow(
log_prints=True,
name=f"get_delta_table::{settings.ENVIRONMENT}",
flow_run_name=f"get_delta_table::{settings.ENVIRONMENT}",
retries=3,
validate_parameters=False,
)
async def get_delta_table(storage_account_path: str) -> DeltaTable:
"""Get a DeltaTable object from a storage account path.
)
"""
storage_options = await get_delta_table_storage_options()
result = DeltaTable(storage_account_path, storage_options=storage_options)
<http://logger.info|logger.info>(f"Successfully accessed DeltaTable at {storage_account_path}.")
<http://logger.info|logger.info>(f"DeltaTable Verison: {result.version()}")
return result