Adam Everington
11/15/2021, 10:26 AMAnna Geller
Adam Everington
11/15/2021, 10:46 AMwith Flow("Load-Data-To-Azure"
,executor=LocalDaskExecutor()
,storage=Git(
flow_path = ".my-flow.py",
repo="prefect",
branch_name="main",
git_clone_url_secret_name="DEVOPS_URL"
)
) as flow:
#Params
server = Parameter('Server Name',default='my-azure-server')
database= Parameter('Database Name',default='MyDb')
working_directory = Parameter('Working Directory',default=r'\\full-unc-to-dir')
user = Parameter('SQL User Name', default='MyUser')
#Secrets
pw = get_secret_value(user)
rsa = PrefectSecret('SFTP_RSA_KEY')
#cnxn properties
cnxn_string = get_cnxn_string(server,database,user,pw)
#Master Key
master_audit_key = get_new_audit_key(step_name='Master',table_name='Master',cnxn_string=cnxn_string)
#tasks
#download files
invoice_files,supplier_files,terms_file = download_files_from_sftp(rsa,working_directory)
#High-level objects
supplier = load_supplier_csvs(supplier_files,terms_file)
invoice = load_invoice_csvs(invoice_files)
supplier = coalesce_supplier_lists(supplier,invoice)
#Record Counts
supplier_extract_count = get_record_count(supplier)
invoice_extract_count = get_record_count(invoice)
#Supplier NK Tasks
supplier_nk_audit_key = get_new_audit_key(step_name='SupplierNaturalKey-Load',table_name='SupplierNaturalKey',cnxn_string=cnxn_string,files=supplier_files,parent_audit_key=master_audit_key)
supplier_nk_to_insert = supplier_nk_left_join_to_supplier_nk_table(supplier,cnxn_string,supplier_nk_audit_key)
supplier_nk_load = load_to_sql_server(supplier_nk_to_insert,'SupplierNaturalKey',cnxn_string)
supplier_nk_load_count = get_record_count(supplier_nk_to_insert)
update_audit_details(cnxn_string=cnxn_string,extract_count=supplier_extract_count,insert_count=supplier_nk_load_count,success_ind='Y',audit_key=supplier_nk_audit_key,upstream_tasks=[supplier_nk_load])
#Supplier Tasks
supplier_audit_key = get_new_audit_key(step_name='Supplier-Load',table_name='Supplier',cnxn_string=cnxn_string,files=supplier_files,parent_audit_key=master_audit_key)
supplier_join_to_nk = supplier_inner_join_to_supplier_nk(supplier,cnxn_string,upstream_tasks=[supplier_nk_load])
suppliers_to_insert,existing_suppliers = supplier_left_join_to_supplier_table(supplier_join_to_nk,supplier_audit_key,cnxn_string)
supplier_load = load_to_sql_server(suppliers_to_insert,'Supplier',cnxn_string)
supplier_load_count = get_record_count(suppliers_to_insert)
suppliers_to_update = supplier_filter_for_changes_only(existing_suppliers,cnxn_string)
supplier_update = update_supplier_records(suppliers_to_update,cnxn_string)
suppliers_to_update_count=get_record_count(suppliers_to_update)
update_audit_details(cnxn_string=cnxn_string,extract_count=supplier_extract_count,insert_count=supplier_load_count,update_count=suppliers_to_update_count,success_ind='Y',audit_key=supplier_audit_key,upstream_tasks=[supplier_load,supplier_update])
#Invoice NK Tasks
invoice_nk_audit_key = get_new_audit_key(step_name='InvoiceNaturalKey-Load',table_name='InvoiceNaturalKey',cnxn_string=cnxn_string,files=invoice_files,parent_audit_key=master_audit_key)
invoice_nk_to_insert = invoice_left_join_invoice_nk(invoice,invoice_nk_audit_key,cnxn_string)
invoice_nk_load = load_to_sql_server(invoice_nk_to_insert,'InvoiceNaturalKey',cnxn_string)
invoice_nk_load_count = get_record_count(invoice_nk_to_insert)
update_audit_details(cnxn_string=cnxn_string,extract_count=invoice_extract_count,insert_count=invoice_nk_load_count,success_ind='Y',audit_key=invoice_nk_audit_key,upstream_tasks=[invoice_nk_load])
#Invoices
invoice_audit_key = get_new_audit_key(step_name='Invoice-Load',table_name='Invoice',cnxn_string=cnxn_string,files=invoice_files,parent_audit_key=master_audit_key)
invoices_join_to_nk = invoice_inner_join_to_inv_nk(invoice,cnxn_string,upstream_tasks=[invoice_nk_load])
#Split Invoice & NK
invoices,invoice_custom_fields = split_invoice_and_custom_fields_tables(invoices_join_to_nk)
#Invoice join to supplier then load / update
invoices_join_to_supplier = invoice_inner_join_to_supplier_nk(invoices,cnxn_string,upstream_tasks=[supplier_load])
invoices_to_insert,invoices_to_update = invoice_left_join_to_invoice_table(invoices_join_to_supplier,invoice_audit_key,cnxn_string)
invoice_load = load_to_sql_server(invoices_to_insert,'Invoice',cnxn_string)
invoice_update = update_invoice_records(invoices_to_update,cnxn_string)
invoice_load_count = get_record_count(invoices_to_insert)
invoice_update_count = get_record_count(invoices_to_update)
#Load cfs / update
inv_cfs_to_insert,inv_cfs_to_update = invoice_cf_left_join_to_invoice_cf_table(invoice_custom_fields,cnxn_string)
inv_cf_load = load_to_sql_server(inv_cfs_to_insert,'InvoiceCustomField',cnxn_string,upstream_tasks=[invoice_load])
inv_cf_update = update_invoice_cf_records(inv_cfs_to_update,cnxn_string)
#Audit
update_audit = update_audit_details(cnxn_string=cnxn_string,extract_count=invoice_extract_count,insert_count=invoice_load_count,update_count=invoice_update_count,success_ind='Y',audit_key=invoice_audit_key,upstream_tasks=[invoice_load,inv_cf_load])
update_master_audit = update_audit_details(cnxn_string=cnxn_string,success_ind='Y',audit_key=master_audit_key,upstream_tasks=[update_audit])
#Cleanup
clean_landing_area(working_directory,upstream_tasks=[update_master_audit,supplier])
capture_execute_post_processing(cnxn_string,upstream_tasks=[update_master_audit])
I've changed the param defaults for privacyAnna Geller
Anna Geller
Adam Everington
11/15/2021, 10:53 AMAdam Everington
11/15/2021, 10:53 AMAnna Geller
Adam Everington
11/15/2021, 10:54 AMAdam Everington
11/15/2021, 10:55 AMAnna Geller
Adam Everington
11/15/2021, 10:55 AMAnna Geller
Adam Everington
11/15/2021, 10:56 AMAnna Geller
export PREFECT__CLOUD__USE_LOCAL_SECRETS=false
it can be that you currently have some Secrets defined locally in config.toml and those are on a local machine but not on Azure?Adam Everington
11/15/2021, 11:07 AMflow.register(
project_name='Capture-ETL',
idempotency_key=flow.serialized_hash(),
labels=['Cloud01']
)
or through the run config? (or both / either!)Anna Geller
from prefect.run_configs import UniversalRun
from prefect import task, Flow
with Flow("Load-Data-To-Azure"
, executor=LocalDaskExecutor()
, storage=Git(
flow_path = ".my-flow.py",
repo="prefect",
branch_name="main",
git_clone_url_secret_name="DEVOPS_URL"
),
run_config=UniversalRun(labels=["label"])
) as flow:
Adam Everington
11/15/2021, 11:20 AMAnna Geller
Anna Geller
Adam Everington
11/15/2021, 11:24 AMAdam Everington
11/15/2021, 1:09 PMAnna Geller
Anna Geller
flow.run_config = LocalRun(
working_dir="/path/to/working-directory"
)
and when you start a local agent you can add a path this way:
prefect agent local start -p /Users/adam/some_prefect_modules/