Tyson Chavarie
07/27/2023, 7:35 PMMarvin
07/27/2023, 7:35 PMTyson Chavarie
07/27/2023, 7:36 PMNate
07/27/2023, 7:36 PMTyson Chavarie
07/27/2023, 7:37 PMNate
07/27/2023, 7:37 PM@flow(task_runner=...)
i mean specifying a task runner likeTyson Chavarie
07/27/2023, 7:37 PM@flow
def reverse_etl_sql_server(report_date_param: str = None, file_date_param: str = None, pg_connect_method: str = "TCP"):
logger = get_run_logger()
try:
postgres_creds = load_secret(logger, "postgres-creds")
sql_server_creds = load_secret(logger, "mssql-creds")
except Exception as e:
logger.error(f"Failed to load or process a secret: {e}")
raise
# Set dynamic parameters
report_date = get_report_date(report_date_param)
file_date = get_file_date(file_date_param)
tables_to_replicate = get_tables_to_replicate(sql_server_creds)
with pg.get_postgres_conn(postgres_creds, pg_connect_method) as postgres_client:
extracted_tables = extract_postgres_table.map(tables_to_replicate, unmapped(postgres_client), unmapped(report_date), unmapped(sql_server_creds))
loaded_tables = load_csv_to_mssql.map(tables_to_replicate, extracted_tables, unmapped(sql_server_creds))
merged_tables = merge_sql_server_tables.map(loaded_tables, unmapped(sql_server_creds))
pol_drd_last_call = build_pol_drd_last_call(merged_tables, sql_server_creds)
dashboard_procs = get_dashboard_procs(pol_drd_last_call)
completed_procs = run_dashboard_procs.map(dashboard_procs, unmapped(sql_server_creds))
Nate
07/27/2023, 7:42 PMTyson Chavarie
07/27/2023, 7:53 PMJul 27th, 2023
Downloading flow code from storage at ''
03:57:00 PM
prefect.flow_runs
Created task run 'get_report_date_reverse_etl-0' for task 'get_report_date_reverse_etl'
03:57:03 PM
prefect.flow_runs
Executing 'get_report_date_reverse_etl-0' immediately...
03:57:03 PM
prefect.flow_runs
2023-07-27
03:57:04 PM
get_report_date_reverse_etl-0
prefect.task_runs
Finished in state Completed()
03:57:04 PM
get_report_date_reverse_etl-0
prefect.task_runs
Created task run 'get_file_date_reverse_etl-0' for task 'get_file_date_reverse_etl'
03:57:04 PM
prefect.flow_runs
Executing 'get_file_date_reverse_etl-0' immediately...
03:57:04 PM
prefect.flow_runs
20230727155622
03:57:04 PM
get_file_date_reverse_etl-0
prefect.task_runs
Finished in state Completed()
03:57:04 PM
get_file_date_reverse_etl-0
prefect.task_runs
Created task run 'get_tables_to_replicate-0' for task 'get_tables_to_replicate'
03:57:04 PM
prefect.flow_runs
Executing 'get_tables_to_replicate-0' immediately...
03:57:04 PM
prefect.flow_runs
<class 'map'>
03:57:05 PM
get_tables_to_replicate-0
prefect.task_runs
Finished in state Completed()
03:57:05 PM
get_tables_to_replicate-0
prefect.task_runs
Successfully connected to the PostgreSQL server
03:57:05 PM
prefect.flow_runs
Created task run 'extract_postgres_table-0' for task 'extract_postgres_table'
03:57:05 PM
prefect.flow_runs
Submitted task run 'extract_postgres_table-0' for execution.
03:57:05 PM
prefect.flow_runs
Created task run 'extract_postgres_table-5' for task 'extract_postgres_table'
03:57:05 PM
prefect.flow_runs
Submitted task run 'extract_postgres_table-5' for execution.
03:57:05 PM
prefect.flow_runs
Created task run 'extract_postgres_table-8' for task 'extract_postgres_table'
03:57:05 PM
prefect.flow_runs
Nate
07/27/2023, 8:07 PMTyson Chavarie
07/27/2023, 8:07 PMNate
07/27/2023, 8:10 PMTyson Chavarie
07/27/2023, 8:10 PMNate
07/27/2023, 8:11 PMMarvin
07/27/2023, 8:11 PMTyson Chavarie
07/27/2023, 8:11 PMNate
07/27/2023, 8:11 PMTK
07/27/2023, 8:12 PM