Adam Everington
12/16/2021, 10:29 AMLocalDaskExecutor
it understandably fails. However, when I set upstream tasks so that they essentially execute one after the other it still fails. If I use a LocalExecutor
it works fine. Thoughts?Anna Geller
Adam Everington
12/16/2021, 10:49 AMAdam Everington
12/16/2021, 10:49 AMAnna Geller
from prefect import task, Flow
@task
def first_task():
pass
@task
def second_task():
pass
@task
def third_task():
pass
with Flow("ex") as flow:
a = first_task()
b = second_task(upstream_tasks=[a])
c = third_task(upstream_tasks=[b])
flow.visualize()
Anna Geller
Adam Everington
12/16/2021, 11:02 AMAnna Geller
Adam Everington
12/16/2021, 11:10 AMAdam Everington
12/16/2021, 11:11 AMwith Flow('Transfer'
#,executor=LocalDaskExecutor()
) as flow:
#params
source_server = Parameter('Source server name',default='SQL10')
source_ap_db = Parameter('Source ap database name',default='WalthamForrest_AP_v3')
source_capture_db = Parameter('Source capture database name',default='WalthamForrest_Capture_v2')
target_server = Parameter('Target server name',default='SQL08')
target_db = Parameter('Target database name',default='TestClient2_Capture_v4_test')
load_ap = Parameter('Load in AP data?',default=True)
load_capture = Parameter('Load in capture data?',default=True)
#cnxn strings
source_ap_cnxn_string = get_cnxn_string(server=source_server,database=source_ap_db)
source_ap_engine = get_alchemy_engine(source_ap_cnxn_string)
source_capture_cnxn_string = get_cnxn_string(server=source_server,database=source_capture_db)
source_capture_engine = get_alchemy_engine(source_capture_cnxn_string)
target_cnxn_string = get_cnxn_string(server=target_server,database=target_db)
target_engine = get_alchemy_engine(target_cnxn_string)
#prep db
disable_constraints = disable_all_db_constraints(target_engine)
#load ap?
with case(load_ap,True):
#etlaudit
etlaudit = source_get_etlaudit(source_ap_engine)
load_etl = load_to_sql_server(etlaudit,target_engine,'ETLAUDIT',has_autoincrement=True,truncate_before_insert=True,upstream_tasks=[disable_constraints])
#supplier
supp_nk,supps = source_get_suppliers(source_ap_engine)
load_suppnk = load_to_sql_server(supp_nk,target_engine,'SupplierNaturalKey',has_autoincrement=True,truncate_before_insert=True,upstream_tasks=[load_etl,disable_constraints])
load_supps = load_to_sql_server(supps,target_engine,'Supplier',has_autoincrement=False,truncate_before_insert=True,upstream_tasks=[disable_constraints])
create_supplier_nk = create_suppliernk_view(target_engine,upstream_tasks=[load_suppnk])
#Invoices
invoice_nk = source_get_invoice_nk(source_ap_engine)
load_invoicenk = load_to_sql_server(invoice_nk,target_engine,'InvoiceNaturalKey',has_autoincrement=True,truncate_before_insert=True,upstream_tasks=[load_suppnk,disable_constraints])
create_invoice_nk = create_invoicenk_view(target_engine,invoice_nk)
invoice,invoice_audit = source_get_invoices(source_ap_engine)
load_invoice = load_to_sql_server(invoice,target_engine,'Invoice',has_autoincrement=False,truncate_before_insert=True,upstream_tasks=[disable_constraints])
load_invoice_audit = load_to_sql_server(invoice_audit,target_engine,'InvoiceAudit',has_autoincrement=False,truncate_before_insert=True,upstream_tasks=[disable_constraints])
invoice_cf = source_get_invoice_customfields(source_ap_engine)
load_invoice_cf = load_to_sql_server(invoice_cf,target_engine,'InvoiceCustomField',has_autoincrement=False,truncate_before_insert=True,upstream_tasks=[disable_constraints])
invoice_detail = source_get_invoice_detail(source_ap_engine)
load_invoice_detail = load_to_sql_server(invoice_detail,target_engine,'InvoiceDetail',has_autoincrement=True,truncate_before_insert=True,upstream_tasks=[load_invoicenk,disable_constraints])
invoice_detail_cf = source_get_invoice_detail_cf(source_ap_engine)
load_invoice_cf_detail = load_to_sql_server(invoice_detail_cf,target_engine,'InvoiceDetailCustomField',has_autoincrement=False,truncate_before_insert=True,upstream_tasks=[disable_constraints])
#load capture?
with case(load_capture,True):
executions = source_get_executions(source_capture_engine)
load_executions = load_to_sql_server(executions,target_engine,'Execution',has_autoincrement=False,truncate_before_insert=True,upstream_tasks=[disable_constraints])
dupe_data = source_get_dupedata(source_capture_engine)
load_dupe_data = load_to_sql_server(dupe_data,target_engine,'DupeStandard_Invoice',has_autoincrement=False,truncate_before_insert=True,upstream_tasks=[disable_constraints])
tax_data = source_get_taxoverview(source_capture_engine)
load_tax_data = load_to_sql_server(tax_data,target_engine,'TaxOverview',has_autoincrement=False,truncate_before_insert=True,upstream_tasks=[disable_constraints])
op_net_data = source_get_netoverpayments(source_capture_engine)
load_op_net_data = load_to_sql_server(op_net_data,target_engine,'OverpaymentsNet',has_autoincrement=False,truncate_before_insert=True,upstream_tasks=[disable_constraints])
op_tax_data = source_get_taxoverpayments(source_capture_engine)
load_op_tax_data = load_to_sql_server(op_tax_data,target_engine,'OverpaymentsTax',has_autoincrement=False,truncate_before_insert=True,upstream_tasks=[disable_constraints])
seq_overview_data = source_get_sequencing_overview(source_capture_engine)
load_seq_overview_data = load_to_sql_server(seq_overview_data,target_engine,'SequencingOverview',has_autoincrement=False,truncate_before_insert=True,upstream_tasks=[disable_constraints])
target_update_sequencing_invoice_count(target_engine,upstream_tasks=[load_seq_overview_data,load_invoice])
spikes_overview_data = source_get_spikes_overview(source_capture_engine)
load_spikes_overview_data = load_to_sql_server(spikes_overview_data,target_engine,'SpikeOverview',has_autoincrement=False,truncate_before_insert=True,upstream_tasks=[disable_constraints])
target_update_spikes_invoice_count(target_engine,upstream_tasks=[load_spikes_overview_data,load_invoice])
charmap_data = source_get_suppinvnocharmap(source_capture_engine)
load_charmapdata = load_to_sql_server(charmap_data,target_engine,'SupplierInvoiceCharMap',has_autoincrement=True,truncate_before_insert=True,upstream_tasks=[load_invoice_detail,disable_constraints])
exec_stats_data = source_get_execution_stats(source_capture_engine)
load_exec_stats_data = load_to_sql_server(exec_stats_data,target_engine,'ExecutionStatistics',has_autoincrement=False,truncate_before_insert=True,upstream_tasks=[disable_constraints])
exec_summary_data = source_get_execution_dupe_summary(source_capture_engine)
load_exec_summary_data = load_to_sql_server(exec_summary_data,target_engine,'ExecutionDupeSummary',has_autoincrement=True,truncate_before_insert=True,upstream_tasks=[load_charmapdata,disable_constraints])
tax_summary_data = source_get_tax_summary_data(source_capture_engine)
load_tax_summary_data = load_to_sql_server(tax_summary_data,target_engine,'TaxSummary',has_autoincrement=True,truncate_before_insert=True,upstream_tasks=[load_exec_summary_data,disable_constraints])
supplier_month_stats_data = source_get_supplier_month_stats_data(source_capture_engine)
load_supplier_month_stats_data = load_to_sql_server(supplier_month_stats_data,target_engine,'SupplierMonthStats',has_autoincrement=True,truncate_before_insert=True,upstream_tasks=[load_tax_summary_data,disable_constraints])
year_month_stats_data = source_get_year_month_stats_data(source_capture_engine)
load_year_month_stats_data = load_to_sql_server(year_month_stats_data,target_engine,'YearMonthTransactionStatistics',has_autoincrement=False,truncate_before_insert=True,upstream_tasks=[disable_constraints])
#tidy db
enable_all_db_constraints(target_engine,upstream_tasks=[
load_suppnk,
load_supps,
load_invoicenk,
load_invoice,
load_invoice_audit,
load_invoice_cf,
load_invoice_detail,
load_invoice_cf_detail,
load_executions,
load_dupe_data,
load_tax_data,
load_op_net_data,
load_op_tax_data,
load_seq_overview_data,
load_spikes_overview_data,
load_charmapdata,
load_exec_stats_data,
load_exec_summary_data,
load_tax_summary_data,
load_supplier_month_stats_data,
load_year_month_stats_data
])
Anna Geller
Anna Geller
from prefect import Flow, unmapped
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.executors import LocalDaskExecutor
with Flow("MasterFlow_Mapped", executor=LocalDaskExecutor()) as flow:
mapped_flow_run_ids = create_flow_run.map(
flow_name=["etlaudit", "supplier", "invoices"],
project_name=unmapped("Flow_of_Flows"),
)
wait_for_flow_run.map(
flow_run_id=mapped_flow_run_ids, raise_final_state=unmapped(True)
)