Maity
10/08/2023, 10:53 PMimport os
from prefect import task
from pydantic import SecretStr
from requests_toolbelt.sessions import BaseUrlSession
from prefect.blocks.core import Block
class UppApiCredentials(Block):
api_url: str
api_key: SecretStr
UPP_API_CREDENTIALS_BLOCK = os.getenv("UPP_API_CREDENTIALS_BLOCK")
# get api details
API_URL = os.getenv("API_URL")
API_KEY = os.getenv("API_KEY")
# if the block env variable was specified, load the credentials from that
if UPP_API_CREDENTIALS_BLOCK:
creds: UppApiCredentials = UppApiCredentials.load(UPP_API_CREDENTIALS_BLOCK)
API_URL = creds.api_url
API_KEY = creds.api_key.get_secret_value()
session = BaseUrlSession(API_URL)
session.headers.update({"x-api-key": API_KEY})
@task(
# cache_expiration=timedelta(hours=1),
# cache_key_fn=task_input_hash,
)
def get_endpoint(endpoint: str, params: dict = None):
params = (
{k: v for k, v in params.items() if v is not None}
if params is not None
else None
)
response = session.get(
f"{API_URL}/{endpoint}",
params=params,
)
response.raise_for_status()
return response.json()
Maity
10/08/2023, 11:54 PMimport os
from prefect.blocks.core import Block
from pydantic import SecretStr
UPP_API_CREDENTIALS_BLOCK = os.getenv("UPP_API_CREDENTIALS_BLOCK")
__DATA_UPP_BLOCK = os.getenv("__DATA_UPP_BLOCK")
class UppApiCredentials(Block):
api_url: str = os.getenv("API_URL")
api_key: SecretStr = os.getenv("API_KEY")
upp_api_credentials = (
UppApiCredentials.load(UPP_API_CREDENTIALS_BLOCK)
if UPP_API_CREDENTIALS_BLOCK
else UppApiCredentials()
)
class _DataUpp(Block):
logging_sql_server: str = os.getenv("LOGGING_SQL_SERVER")
logging_sql_username: str = os.getenv("LOGGING_SQL_USER")
logging_sql_password: SecretStr = os.getenv("LOGGING_SQL_PASSWORD")
logging_sql_database: str = os.getenv("LOGGING_SQL_DATABASE")
logging_sql_table: str = os.getenv(
"LOGGING_SQL_TABLE", "UPP_Viewpoint_Integration_Logs"
)
__data_upp = (
_DataUpp.load(__DATA_UPP_BLOCK)
if __DATA_UPP_BLOCK
else _DataUpp()
)
Jan Malek
10/09/2023, 9:24 AM__data_pp = (__DATA_UPP_BLOCK and _DataUpp.load(__DATA_UPP_BLOCK)) or _DataUpp()
Maity
10/09/2023, 10:46 PM