Hi all, does anyone have any examples of patterns ...
# prefect-community
a
Hi all, does anyone have any examples of patterns for bulk loading data into Postgres from Prefect? Currently I’m mapping over my dataset and passing each individual record into the
PostgresExecute
task which runs an insert statement. In addition to the thousands of INSERTs, the task also creates and terminates a postgres connection each time which i’d prefer to avoid. Thoughts? My current implementation in the thread
Copy code
insert_customer = PostgresExecute(
    query=""" 
        INSERT INTO temp_deleted_customers (customer_id, synapse_account_id, date_deleted)
        VALUES (%s, %s, %s);
    """,
    db_name="xx",
    user="xx",
    host="xx",
    commit=True,
)


with Flow("Deleted Customer Nodes") as flow:
    deleted_customers = get_deleted_customers() # BigQueryTask
    closed_nodes = get_customer_deposit_nodes.map(deleted_customers) # Returns a (x,y,z) tuple per customer
    create_table = create_pg_table(
        password=PrefectSecret("POSTGRES_PASSWORD"), commit=True
    )
    insert_customer.map(
        data=closed_nodes, password=unmapped(PrefectSecret("POSTGRES_PASSWORD"))
    )
d
Hi @Adam! Depending on how many customers you’re expecting at a time and the memory available in your execution environment, I’d suggest giving pandas a shot.
You can convert bigquery results into a dataframe using the
.to_dataframe
method on a bigquery result
at that point you should be able to call
.to_sql
on that dataframe and perform a single insert into postgres
Keep in mind you’ll have to configure pandas a bit
Might be a good place to start
a
Thanks @Dylan, that was just what I needed.
For those interested in the solution:
Copy code
@task
def save_customers_to_sql(deleted_nodes, postgres_user, postgres_password):
    logger = prefect.context.get("logger")
    df = pd.DataFrame(deleted_nodes)
    df = df.dropna()
    engine = create_engine(
        f"postgresql+psycopg2://{postgres_user}:{postgres_password}@127.0.0.1/xxx",
        pool_recycle=3600,
    )
    conn = engine.connect()
    try:
        df.to_sql("temp_deleted_customers", conn, if_exists="replace")
    finally:
        <http://logger.info|logger.info>("Table has been created successfully")
        conn.close()
Where
deleted_rows
is the output of a mapping task (list of dicts which nicely turn into a dataframe)
d
Awesome!
Glad I could help =]
d
If you’ve got a significant number of rows, you might want to consider passing a custom function to the
method
argument in
<http://df.to|df.to>_sql
to do a Postgres COPY rather than INSERT. With a large number of rows it may be an order of magnitude faster. Here’s an example from the Pandas docs: https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-sql-method
👍 1