Adam
08/05/2020, 3:25 PMPostgresExecute
task which runs an insert statement. In addition to the thousands of INSERTs, the task also creates and terminates a postgres connection each time which i’d prefer to avoid. Thoughts? My current implementation in the threadinsert_customer = PostgresExecute(
query="""
INSERT INTO temp_deleted_customers (customer_id, synapse_account_id, date_deleted)
VALUES (%s, %s, %s);
""",
db_name="xx",
user="xx",
host="xx",
commit=True,
)
with Flow("Deleted Customer Nodes") as flow:
deleted_customers = get_deleted_customers() # BigQueryTask
closed_nodes = get_customer_deposit_nodes.map(deleted_customers) # Returns a (x,y,z) tuple per customer
create_table = create_pg_table(
password=PrefectSecret("POSTGRES_PASSWORD"), commit=True
)
insert_customer.map(
data=closed_nodes, password=unmapped(PrefectSecret("POSTGRES_PASSWORD"))
)
Dylan
08/05/2020, 4:49 PM.to_dataframe
method on a bigquery result.to_sql
on that dataframe and perform a single insert into postgresAdam
08/05/2020, 5:56 PM@task
def save_customers_to_sql(deleted_nodes, postgres_user, postgres_password):
logger = prefect.context.get("logger")
df = pd.DataFrame(deleted_nodes)
df = df.dropna()
engine = create_engine(
f"postgresql+psycopg2://{postgres_user}:{postgres_password}@127.0.0.1/xxx",
pool_recycle=3600,
)
conn = engine.connect()
try:
df.to_sql("temp_deleted_customers", conn, if_exists="replace")
finally:
<http://logger.info|logger.info>("Table has been created successfully")
conn.close()
Where deleted_rows
is the output of a mapping task (list of dicts which nicely turn into a dataframe)Dylan
08/05/2020, 6:46 PMDan Ball
08/06/2020, 3:35 PMmethod
argument in <http://df.to|df.to>_sql
to do a Postgres COPY rather than INSERT. With a large number of rows it may be an order of magnitude faster.
Here’s an example from the Pandas docs:
https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-sql-method