Ok, not sure this is a prefect or a SQL Server iss...
# ask-community
a
Ok, not sure this is a prefect or a SQL Server issue but.... SQL Server only allows one IDENTITY_INSERT ON on a table at once per session so when I run say 5 tasks that set identity insert to on, insert the records then set identity insert back to off using a
LocalDaskExecutor
it understandably fails. However, when I set upstream tasks so that they essentially execute one after the other it still fails. If I use a
LocalExecutor
it works fine. Thoughts?
a
@Adam Everington if you use LocalDaskExecutor and you don’t use mapping, and your tasks have no sequential dependencies on each other, then LocalDaskExecutor will attempt to run those tasks in parallel - this seems to be why you get this issue with multiple SQL Server sessions. In general, for this kind of tasks, it’s probably useful to do everything sequentially and use the LocalExecutor only right? this helps to make sure you don’t accidentally cross any limits of your DB
a
Morning @Anna Geller! What do you mean by sequential dependencies? In my example i've my 5 tasks and i've chained the dependencies so one executes after the other? (Admittedly in my actual script i've other inserts going through the task in parallel but the ones that require ident insert are chained
If I have to use localexecutor that's fine... just trying to get some speed gains
a
This is what I meant: A->B->C means nothing can run in parallel
Copy code
from prefect import task, Flow

@task
def first_task():
    pass

@task
def second_task():
    pass

@task
def third_task():
    pass

with Flow("ex") as flow:
    a = first_task()
    b = second_task(upstream_tasks=[a])
    c = third_task(upstream_tasks=[b])

flow.visualize()
If you want speed gains, maybe you can separate the transformations from load? E.g. you can use LocalDaskExecutor to grab your data and do whatever you need to do with it and dump those in the end e.g. to S3 - all this can run in parallel. Then a separate process (e.g. a separate task in the same flow) can take care of reading and batch-loading all those files into SQL Server.
a
Hey Anna, thanks for this. The example you've provided is exactly how i've set it up for those tasks in question but I still experience errors saying that i've not turned iden insert on.
a
Can you share your flow structure? i.e. your Flow block
a
Yep... give me a sec and i'll comment out all of the tasks that run alongside
Copy code
with Flow('Transfer'
    #,executor=LocalDaskExecutor()
    ) as flow:
    #params
    source_server = Parameter('Source server name',default='SQL10')
    source_ap_db = Parameter('Source ap database name',default='WalthamForrest_AP_v3')
    source_capture_db = Parameter('Source capture database name',default='WalthamForrest_Capture_v2')
    target_server = Parameter('Target server name',default='SQL08')
    target_db = Parameter('Target database name',default='TestClient2_Capture_v4_test')
    load_ap = Parameter('Load in AP data?',default=True)
    load_capture = Parameter('Load in capture data?',default=True)

    #cnxn strings
    source_ap_cnxn_string = get_cnxn_string(server=source_server,database=source_ap_db)
    source_ap_engine = get_alchemy_engine(source_ap_cnxn_string)
    source_capture_cnxn_string = get_cnxn_string(server=source_server,database=source_capture_db)
    source_capture_engine = get_alchemy_engine(source_capture_cnxn_string)
    target_cnxn_string = get_cnxn_string(server=target_server,database=target_db)
    target_engine = get_alchemy_engine(target_cnxn_string)

    #prep db
    disable_constraints = disable_all_db_constraints(target_engine)

    #load ap?
    with case(load_ap,True):
        #etlaudit
        etlaudit = source_get_etlaudit(source_ap_engine)
        load_etl = load_to_sql_server(etlaudit,target_engine,'ETLAUDIT',has_autoincrement=True,truncate_before_insert=True,upstream_tasks=[disable_constraints])

        #supplier
        supp_nk,supps = source_get_suppliers(source_ap_engine)
        load_suppnk = load_to_sql_server(supp_nk,target_engine,'SupplierNaturalKey',has_autoincrement=True,truncate_before_insert=True,upstream_tasks=[load_etl,disable_constraints])
        load_supps = load_to_sql_server(supps,target_engine,'Supplier',has_autoincrement=False,truncate_before_insert=True,upstream_tasks=[disable_constraints])
        create_supplier_nk = create_suppliernk_view(target_engine,upstream_tasks=[load_suppnk])

        #Invoices
        invoice_nk = source_get_invoice_nk(source_ap_engine)
        load_invoicenk = load_to_sql_server(invoice_nk,target_engine,'InvoiceNaturalKey',has_autoincrement=True,truncate_before_insert=True,upstream_tasks=[load_suppnk,disable_constraints])
        create_invoice_nk = create_invoicenk_view(target_engine,invoice_nk)
        invoice,invoice_audit = source_get_invoices(source_ap_engine)
        load_invoice = load_to_sql_server(invoice,target_engine,'Invoice',has_autoincrement=False,truncate_before_insert=True,upstream_tasks=[disable_constraints])
        load_invoice_audit = load_to_sql_server(invoice_audit,target_engine,'InvoiceAudit',has_autoincrement=False,truncate_before_insert=True,upstream_tasks=[disable_constraints])
        invoice_cf = source_get_invoice_customfields(source_ap_engine)
        load_invoice_cf = load_to_sql_server(invoice_cf,target_engine,'InvoiceCustomField',has_autoincrement=False,truncate_before_insert=True,upstream_tasks=[disable_constraints])
        invoice_detail = source_get_invoice_detail(source_ap_engine)
        load_invoice_detail = load_to_sql_server(invoice_detail,target_engine,'InvoiceDetail',has_autoincrement=True,truncate_before_insert=True,upstream_tasks=[load_invoicenk,disable_constraints])
        invoice_detail_cf = source_get_invoice_detail_cf(source_ap_engine)
        load_invoice_cf_detail = load_to_sql_server(invoice_detail_cf,target_engine,'InvoiceDetailCustomField',has_autoincrement=False,truncate_before_insert=True,upstream_tasks=[disable_constraints])

    #load capture?    
    with case(load_capture,True):
        executions = source_get_executions(source_capture_engine)
        load_executions = load_to_sql_server(executions,target_engine,'Execution',has_autoincrement=False,truncate_before_insert=True,upstream_tasks=[disable_constraints])
        dupe_data = source_get_dupedata(source_capture_engine)
        load_dupe_data = load_to_sql_server(dupe_data,target_engine,'DupeStandard_Invoice',has_autoincrement=False,truncate_before_insert=True,upstream_tasks=[disable_constraints])
        tax_data = source_get_taxoverview(source_capture_engine)
        load_tax_data = load_to_sql_server(tax_data,target_engine,'TaxOverview',has_autoincrement=False,truncate_before_insert=True,upstream_tasks=[disable_constraints])
        op_net_data = source_get_netoverpayments(source_capture_engine)
        load_op_net_data = load_to_sql_server(op_net_data,target_engine,'OverpaymentsNet',has_autoincrement=False,truncate_before_insert=True,upstream_tasks=[disable_constraints])
        op_tax_data = source_get_taxoverpayments(source_capture_engine)
        load_op_tax_data = load_to_sql_server(op_tax_data,target_engine,'OverpaymentsTax',has_autoincrement=False,truncate_before_insert=True,upstream_tasks=[disable_constraints])
        seq_overview_data = source_get_sequencing_overview(source_capture_engine)
        load_seq_overview_data = load_to_sql_server(seq_overview_data,target_engine,'SequencingOverview',has_autoincrement=False,truncate_before_insert=True,upstream_tasks=[disable_constraints])
        target_update_sequencing_invoice_count(target_engine,upstream_tasks=[load_seq_overview_data,load_invoice])
        spikes_overview_data = source_get_spikes_overview(source_capture_engine)
        load_spikes_overview_data = load_to_sql_server(spikes_overview_data,target_engine,'SpikeOverview',has_autoincrement=False,truncate_before_insert=True,upstream_tasks=[disable_constraints])
        target_update_spikes_invoice_count(target_engine,upstream_tasks=[load_spikes_overview_data,load_invoice])
        charmap_data = source_get_suppinvnocharmap(source_capture_engine)
        load_charmapdata = load_to_sql_server(charmap_data,target_engine,'SupplierInvoiceCharMap',has_autoincrement=True,truncate_before_insert=True,upstream_tasks=[load_invoice_detail,disable_constraints])
        exec_stats_data = source_get_execution_stats(source_capture_engine)
        load_exec_stats_data = load_to_sql_server(exec_stats_data,target_engine,'ExecutionStatistics',has_autoincrement=False,truncate_before_insert=True,upstream_tasks=[disable_constraints])
        exec_summary_data = source_get_execution_dupe_summary(source_capture_engine)
        load_exec_summary_data = load_to_sql_server(exec_summary_data,target_engine,'ExecutionDupeSummary',has_autoincrement=True,truncate_before_insert=True,upstream_tasks=[load_charmapdata,disable_constraints])
        tax_summary_data = source_get_tax_summary_data(source_capture_engine)
        load_tax_summary_data = load_to_sql_server(tax_summary_data,target_engine,'TaxSummary',has_autoincrement=True,truncate_before_insert=True,upstream_tasks=[load_exec_summary_data,disable_constraints])
        supplier_month_stats_data = source_get_supplier_month_stats_data(source_capture_engine)
        load_supplier_month_stats_data = load_to_sql_server(supplier_month_stats_data,target_engine,'SupplierMonthStats',has_autoincrement=True,truncate_before_insert=True,upstream_tasks=[load_tax_summary_data,disable_constraints])
        year_month_stats_data = source_get_year_month_stats_data(source_capture_engine)
        load_year_month_stats_data = load_to_sql_server(year_month_stats_data,target_engine,'YearMonthTransactionStatistics',has_autoincrement=False,truncate_before_insert=True,upstream_tasks=[disable_constraints])

    #tidy db
    enable_all_db_constraints(target_engine,upstream_tasks=[
        load_suppnk,
        load_supps,
        load_invoicenk,
        load_invoice,
        load_invoice_audit,
        load_invoice_cf,
        load_invoice_detail,
        load_invoice_cf_detail,
        load_executions,
        load_dupe_data,
        load_tax_data,
        load_op_net_data,
        load_op_tax_data,
        load_seq_overview_data,
        load_spikes_overview_data,
        load_charmapdata,
        load_exec_stats_data,
        load_exec_summary_data,
        load_tax_summary_data,
        load_supplier_month_stats_data,
        load_year_month_stats_data
    ])
a
@Adam Everington in the current design I don’t see any potential for parallelism unless you would perhaps split the AP and Capture into separate flows (the etlaudit, supplier and invoices also seem to be independent of others and could be in separate flows) and then map over flows to run them in parallel.
Copy code
from prefect import Flow, unmapped
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.executors import LocalDaskExecutor


with Flow("MasterFlow_Mapped", executor=LocalDaskExecutor()) as flow:
    mapped_flow_run_ids = create_flow_run.map(
        flow_name=["etlaudit", "supplier", "invoices"],
        project_name=unmapped("Flow_of_Flows"),
    )
    wait_for_flow_run.map(
        flow_run_id=mapped_flow_run_ids, raise_final_state=unmapped(True)
    )