Charles Phares
07/27/2022, 11:40 PMKhuyen Tran
07/27/2022, 11:45 PMCharles Phares
07/27/2022, 11:47 PMKhuyen Tran
07/27/2022, 11:51 PMCharles Phares
07/27/2022, 11:55 PMBilly McMonagle
07/28/2022, 2:06 AMCharles Phares
07/28/2022, 5:20 AMBilly McMonagle
07/28/2022, 12:32 PMimport pendulum
from my_private_package import PostgresClient, SnowflakeClient, slack_failure_notification
from prefect import Flow, Parameter, task
from prefect.schedules import CronSchedule
@task
def get_data_from_snowflake(project_name, report_date):
with SnowflakeClient() as snowflake:
data = snowflake.execute(
sql_statement=get_data_sql_statement,
params={
"project_name": project_name,
"report_date": report_date,
},
)
return data
@task
def load_data_to_postgres(data):
with PostgresClient() as pg:
pg.execute(sql_statement=load_data_sql_statement, values=data)
with Flow(
"Snowflake to PostgreSQL",
schedule=CronSchedule("0 8 * * 1-5", start_date=pendulum.now("America/New_York")),
state_handlers=[slack_failure_notification],
) as flow:
project_name = Parameter("project")
report_date = Parameter("report_data")
data = get_data_from_snowflake(project_name=project_name, report_date=report_date)
load_data_to_postgres(data)