Stéphan Taljaard
05/17/2022, 3:03 PM...
def create_conn_str(
username: str, password: str, host: str, database: str, port=None, dialect="mssql", driver="pymssql"
) -> str:
"""Create a SQLAlchemy database URL connection string"""
username_ = quote_plus(username)
password_ = quote_plus(password)
port_ = f":{port}" if port else ""
return f"{dialect}+{driver}://{username_}:{password_}@{host}{port_}/{database}"
@task
def extract_data(tag: str, date_start, date_end, secret_value: str) -> pd.DataFrame:
secret = json.loads(secret_value)
user = secret["user"]
password = secret["password"]
database = secret["database"]
host = secret["host"]
conn_str = create_conn_str(user, password, host, database)
engine = sqlalchemy.create_engine(conn_str)
start_date = date_start.to_datetime_string()
end_date = date_end.to_datetime_string()
query = f"EXEC GetData '{tag}', '{start_date}', '{end_date}'"
df = pd.read_sql_query(query, engine, parse_dates="Timestamp")
return df
...
with Flow(...):
...
date_start, date_end = generate_start_and_end_dates(start_date, start_date_hours_delta, end_date)
...
df_list = extract_data.map(tag_list, date_start, date_end, db_creds)
single_df = combine_data(df_list)
...
Kevin Kho
Stéphan Taljaard
05/17/2022, 4:07 PMKevin Kho
Stéphan Taljaard
05/17/2022, 4:14 PM@task(checkpoint=False)
def create_db_connection(secret_value: str) -> sqlalchemy.engine.base.Connection:
...
conn_str = create_conn_str(user, password, host, database)
engine = sqlalchemy.create_engine(conn_str)
return engine.connect()
@task(checkpoint=False)
def close_db_connection(connection: sqlalchemy.engine.base.Connection, dummy_to_preserve_order):
connection.close()
@task(max_retries=3, retry_delay=timedelta(minutes=7))
def extract_data(
tag: Union[None, str], date_start, date_end, connection: sqlalchemy.engine.base.Connection
) -> pd.DataFrame:
...
return pd.read_sql_query(query, connection, parse_dates="Timestamp")
...
with Flow(...):
...
db_connection = create_db_connection(secret)
df_list = extract_data.map(tag_list, date_start, date_end, db_connection)
close_db_connection(db_connection, df_list)
single_df = combine_data(df_list)
....
Kevin Kho