iñigo
02/18/2022, 1:57 PMAnna Geller
from prefect import Flow, Parameter, task
from prefect.tasks.postgres import PostgresExecute, PostgresExecuteMany, PostgresFetch
from prefect.tasks.secrets import PrefectSecret
postgres_fetch_task = PostgresFetch(
db_name="postgres",
user="postgres",
host="localhost",
port=5432,
commit=True,
fetch="all",
)
@task
def define_query_from_param(table_name: str):
return f"SELECT * FROM {table_name};"
@task(log_stdout=True)
def print_data(x):
print(x) # prints a list of tuples
with Flow("postgres_example") as flow:
postgres_pwd = PrefectSecret("POSTGRES_PASSWORD")
table_name = Parameter("table_name", default="stage.customers")
query = define_query_from_param(table_name)
final_data = postgres_fetch_task(password=postgres_pwd, query=query)
print_data(final_data)
if __name__ == "__main__":
flow.run()
iñigo
02/18/2022, 7:02 PMAnna Geller
iñigo
02/18/2022, 7:31 PMAnna Geller
iñigo
02/18/2022, 7:59 PM