https://prefect.io logo
Title
s

Stéphan Taljaard

05/17/2022, 3:03 PM
Hi. I was wondering how everyone tackles Prefect 1.0 mapped SQL queries Mapping like the example in thread causes a slow-running flow in that the database connection is re-established for each mapped run. How do you do this using a single connection?
...

def create_conn_str(
    username: str, password: str, host: str, database: str, port=None, dialect="mssql", driver="pymssql"
) -> str:
    """Create a SQLAlchemy database URL connection string"""
    username_ = quote_plus(username)
    password_ = quote_plus(password)
    port_ = f":{port}" if port else ""
    return f"{dialect}+{driver}://{username_}:{password_}@{host}{port_}/{database}"


@task
def extract_data(tag: str, date_start, date_end, secret_value: str) -> pd.DataFrame:
    secret = json.loads(secret_value)
    user = secret["user"]
    password = secret["password"]
    database = secret["database"]
    host = secret["host"]

    conn_str = create_conn_str(user, password, host, database)
    engine = sqlalchemy.create_engine(conn_str)
    
    start_date = date_start.to_datetime_string()
    end_date = date_end.to_datetime_string()
    query = f"EXEC GetData '{tag}', '{start_date}', '{end_date}'"
    df = pd.read_sql_query(query, engine, parse_dates="Timestamp")
    return df

...

with Flow(...):
    ...
    date_start, date_end = generate_start_and_end_dates(start_date, start_date_hours_delta, end_date)
    ...
    df_list = extract_data.map(tag_list, date_start, date_end, db_creds)

    single_df = combine_data(df_list)
    ...
Replication tools like Airbyte come to mind. However, the database table has many other "tags" - full database/table replication is not feasible. It's not a table I have control over.
https://github.com/PrefectHQ/prefect/issues/1876 is an option. I was hoping to hear how the community approaches this problem.
k

Kevin Kho

05/17/2022, 3:59 PM
I guess it wouldn’t work for DaskExecutor because you can’t pass that type of thing. But if you turn off checkpointing, you could return a connection from a task and pass it to downstream tasks
And maybe LocalDaskExecutor with threads would work
s

Stéphan Taljaard

05/17/2022, 4:07 PM
It's indeed faster mapping with LocalDaskExecutor, but still unbearably slow. I can try your suggestion of disabling checkpointing
k

Kevin Kho

05/17/2022, 4:10 PM
Checkpointing won’t speed it up. It will just prevent serialization
s

Stéphan Taljaard

05/17/2022, 4:14 PM
Not checkpointing along with the linked suggestion from GitHub might be the easiest way to speed up
Runtime decreased from 60 minutes -> 14 minutes:
@task(checkpoint=False)
def create_db_connection(secret_value: str) -> sqlalchemy.engine.base.Connection:
    ...
    conn_str = create_conn_str(user, password, host, database)
    engine = sqlalchemy.create_engine(conn_str)
    return engine.connect()

@task(checkpoint=False)
def close_db_connection(connection: sqlalchemy.engine.base.Connection, dummy_to_preserve_order):
    connection.close()

@task(max_retries=3, retry_delay=timedelta(minutes=7))
def extract_data(
    tag: Union[None, str], date_start, date_end, connection: sqlalchemy.engine.base.Connection
) -> pd.DataFrame:
    ...
    return pd.read_sql_query(query, connection, parse_dates="Timestamp")

...

with Flow(...):
    ...
    db_connection = create_db_connection(secret)
    df_list = extract_data.map(tag_list, date_start, date_end, db_connection)
    close_db_connection(db_connection, df_list)
    single_df = combine_data(df_list)
    ....
k

Kevin Kho

05/18/2022, 2:15 PM
Oh that;s quite a big jump