https://prefect.io logo
Title
d

Dominic Pham

03/26/2022, 12:22 AM
Hi all, when I create a connection to a db using prefect.task.sql_server and I create a global temp table, will the temp table persist even when the flow is done? Or will I have to explicitly close the connection as a final task in the flow?
:discourse: 1
k

Kevin Kho

03/26/2022, 1:45 AM
If you use the SQL Server task of Prefect, the connection will be closed inside the task. The source code is pretty readable if you check. On the temp table…I’m honestly not sure but I feel like it does not
a

Anna Geller

03/26/2022, 9:00 AM
@Dominic Pham it depends on your query. When you execute a stored procedure written in a way that leverages an in-memory temp table, then the temp table doesn't persist. But if you manually create a temp table and just execute INSERT or MERGE query, then temp table will persist. So it depends on your query, Prefect doesn't limit you here in any way
d

Dominic Pham

03/28/2022, 5:25 PM
Thanks for the responses! The basic idea for this pipeline is to manually create a global temp table with RAW sql (using prefect.task.sql_server I believe will close the connection/session once the task is done as per Kevin), move on to a different task that will invoke an insert and closes the connection. (Or moves on to a final task to close the connection.)
a

Anna Geller

03/28/2022, 6:13 PM
I see, even if you are using a global temp table you must be a bit more careful though since the session creating that table CANNOT be closed in order to be used in another session: "Global temporary table is automatically dropped when the session that created the table ends" So you can't use the prebuilt Prefect task for this because you can't close the first connection in order to use the global temp table in the second operation/task. I suggest you build a custom functional task e.g.:
@task
def load_data_to_sql_server(dataframe, some_table_name):
    your_conn_str = None  # define it
    create_tmp_tbl_stmt = f"""select top 0 * into #temp_table from {some_table_name}"""
    # OR: never true so that it always returns 0 rows, just to get the table structure
    # create_tmp_tbl_stmt = f"""select * into #temp_table from some_table_name where 1=2"""

    insert_to_tmp_tbl_stmt = f"""insert into #temp_table VALUES (?,?,?,?,?,?,?,?,?,?)"""
    tuples = dataframe.values.tolist()

    with pyodbc.connect(your_conn_str, autocommit=True) as conn:
        cursor = conn.cursor()
        cursor.fast_executemany = True
        cursor.execute(create_tmp_tbl_stmt)
        cursor.executemany(insert_to_tmp_tbl_stmt, tuples)
        print(f"{len(dataframe)} rows inserted to the Temp table")
        cursor.commit()
        cursor.close()
        conn.close()
the above code works with both local and global temp table, because everything runs in one session check this if you need more info on that https://stackoverflow.com/a/39278050/9509388
d

Dominic Pham

03/28/2022, 6:39 PM
Ah yes the custom function code you defined is almost identical to what I have written so far, thanks!
👍 1
Manually dropping the table could be a workaround to the first connection staying active no?
a

Anna Geller

03/28/2022, 6:43 PM
manually creating the table would - the temp table is the issue