Henrietta Salonen
02/25/2022, 2:31 PMData
argument correctly for Prefect’s PostgresExecute
task together with the Query
argument.
I have two different dataframes that I would like to insert into Postgres tables. This is what I have now but unsure of how would I pass the data argument here
```@task
def make_query(schema_name, table_name, columns):
return f'''
CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} ({columns});
INSERT INTO {schema_name}.{table_name} ({columns}) VALUES ();'
'''
with Flow("postgres_test") as flow:
execute_task = PostgresExecute(db_name=database, user=user, host=host, port=port, commit=True)
table_name = Parameter("table_name", default=[df1.name, df2.name])
columns = Parameter("columns", default=[df1_columns, df2_columns])
query = make_query.map(unmapped(schema_name), table_name, columns)
execute_task.map(password=unmapped(postgres_pwd), query=query)
if name == "__main__":
flow.run()```