I get a Failed to load and execute Flow's environm...
# ask-community
a
I get a Failed to load and execute Flow's environment: TypeError("'type' object is not subscriptable") only on my AZ deployment of Prefect... locally it runs absolutely fine?
a
With AZ you mean Azure? 🙃 Can you share your flow definition?
a
Of courseeeeee.... (and yes Azure :P)
Copy code
with Flow("Load-Data-To-Azure"
    ,executor=LocalDaskExecutor()
    ,storage=Git(
            flow_path = ".my-flow.py",
            repo="prefect",
            branch_name="main",
            git_clone_url_secret_name="DEVOPS_URL"
        )
    ) as flow:    

    #Params
    server = Parameter('Server Name',default='my-azure-server')
    database= Parameter('Database Name',default='MyDb')
    working_directory = Parameter('Working Directory',default=r'\\full-unc-to-dir')
    user = Parameter('SQL User Name', default='MyUser')

    #Secrets
    pw = get_secret_value(user)
    rsa = PrefectSecret('SFTP_RSA_KEY')

    #cnxn properties
    cnxn_string = get_cnxn_string(server,database,user,pw)

    #Master Key
    master_audit_key = get_new_audit_key(step_name='Master',table_name='Master',cnxn_string=cnxn_string)

    #tasks
    #download files
    invoice_files,supplier_files,terms_file = download_files_from_sftp(rsa,working_directory)

    #High-level objects
    supplier = load_supplier_csvs(supplier_files,terms_file)
    invoice = load_invoice_csvs(invoice_files)
    supplier = coalesce_supplier_lists(supplier,invoice)
    #Record Counts
    supplier_extract_count = get_record_count(supplier)
    invoice_extract_count = get_record_count(invoice)

    #Supplier NK Tasks
    supplier_nk_audit_key = get_new_audit_key(step_name='SupplierNaturalKey-Load',table_name='SupplierNaturalKey',cnxn_string=cnxn_string,files=supplier_files,parent_audit_key=master_audit_key)
    supplier_nk_to_insert = supplier_nk_left_join_to_supplier_nk_table(supplier,cnxn_string,supplier_nk_audit_key)
    supplier_nk_load = load_to_sql_server(supplier_nk_to_insert,'SupplierNaturalKey',cnxn_string)
    supplier_nk_load_count = get_record_count(supplier_nk_to_insert)
    update_audit_details(cnxn_string=cnxn_string,extract_count=supplier_extract_count,insert_count=supplier_nk_load_count,success_ind='Y',audit_key=supplier_nk_audit_key,upstream_tasks=[supplier_nk_load])

    #Supplier Tasks
    supplier_audit_key = get_new_audit_key(step_name='Supplier-Load',table_name='Supplier',cnxn_string=cnxn_string,files=supplier_files,parent_audit_key=master_audit_key)
    supplier_join_to_nk = supplier_inner_join_to_supplier_nk(supplier,cnxn_string,upstream_tasks=[supplier_nk_load])
    suppliers_to_insert,existing_suppliers = supplier_left_join_to_supplier_table(supplier_join_to_nk,supplier_audit_key,cnxn_string)
    supplier_load = load_to_sql_server(suppliers_to_insert,'Supplier',cnxn_string)
    supplier_load_count = get_record_count(suppliers_to_insert)
    suppliers_to_update = supplier_filter_for_changes_only(existing_suppliers,cnxn_string)
    supplier_update = update_supplier_records(suppliers_to_update,cnxn_string)
    suppliers_to_update_count=get_record_count(suppliers_to_update)
    update_audit_details(cnxn_string=cnxn_string,extract_count=supplier_extract_count,insert_count=supplier_load_count,update_count=suppliers_to_update_count,success_ind='Y',audit_key=supplier_audit_key,upstream_tasks=[supplier_load,supplier_update])

    #Invoice NK Tasks
    invoice_nk_audit_key = get_new_audit_key(step_name='InvoiceNaturalKey-Load',table_name='InvoiceNaturalKey',cnxn_string=cnxn_string,files=invoice_files,parent_audit_key=master_audit_key)
    invoice_nk_to_insert = invoice_left_join_invoice_nk(invoice,invoice_nk_audit_key,cnxn_string)
    invoice_nk_load = load_to_sql_server(invoice_nk_to_insert,'InvoiceNaturalKey',cnxn_string)
    invoice_nk_load_count = get_record_count(invoice_nk_to_insert)
    update_audit_details(cnxn_string=cnxn_string,extract_count=invoice_extract_count,insert_count=invoice_nk_load_count,success_ind='Y',audit_key=invoice_nk_audit_key,upstream_tasks=[invoice_nk_load])

    #Invoices
    invoice_audit_key = get_new_audit_key(step_name='Invoice-Load',table_name='Invoice',cnxn_string=cnxn_string,files=invoice_files,parent_audit_key=master_audit_key)
    invoices_join_to_nk = invoice_inner_join_to_inv_nk(invoice,cnxn_string,upstream_tasks=[invoice_nk_load])
    #Split Invoice & NK
    invoices,invoice_custom_fields = split_invoice_and_custom_fields_tables(invoices_join_to_nk)
    
    #Invoice join to supplier then load / update
    invoices_join_to_supplier = invoice_inner_join_to_supplier_nk(invoices,cnxn_string,upstream_tasks=[supplier_load])
    invoices_to_insert,invoices_to_update = invoice_left_join_to_invoice_table(invoices_join_to_supplier,invoice_audit_key,cnxn_string)
    invoice_load = load_to_sql_server(invoices_to_insert,'Invoice',cnxn_string)
    invoice_update = update_invoice_records(invoices_to_update,cnxn_string)
    invoice_load_count = get_record_count(invoices_to_insert)
    invoice_update_count = get_record_count(invoices_to_update)

    #Load cfs / update
    inv_cfs_to_insert,inv_cfs_to_update = invoice_cf_left_join_to_invoice_cf_table(invoice_custom_fields,cnxn_string)
    inv_cf_load = load_to_sql_server(inv_cfs_to_insert,'InvoiceCustomField',cnxn_string,upstream_tasks=[invoice_load])
    inv_cf_update = update_invoice_cf_records(inv_cfs_to_update,cnxn_string)

    #Audit
    update_audit = update_audit_details(cnxn_string=cnxn_string,extract_count=invoice_extract_count,insert_count=invoice_load_count,update_count=invoice_update_count,success_ind='Y',audit_key=invoice_audit_key,upstream_tasks=[invoice_load,inv_cf_load])
    update_master_audit = update_audit_details(cnxn_string=cnxn_string,success_ind='Y',audit_key=master_audit_key,upstream_tasks=[update_audit])
    #Cleanup
    clean_landing_area(working_directory,upstream_tasks=[update_master_audit,supplier])
    capture_execute_post_processing(cnxn_string,upstream_tasks=[update_master_audit])
I've changed the param defaults for privacy
a
@Adam Everington I think you are missing the run configuration
a
Ok, so i'm doing it on a Universal run using a local agent on the host machine (same machine as the server)... isn't that the default and therefore doesn't require specifying
?
a
ideally, you should define it explicitly to avoid any issues. So you’re using Local agent both on your local machine and on Azure VM?
a
Yep!
I'll define it, redeploy and see if it helps though
a
in that case, it would be best if you assign a different label to each
a
I do... the one on the VM has a label called "Cloud01" and the local one has the agent label "local"
a
then you can run locally e.g. with label “dev” and once you are ready to deploy to Azure, you switch the label e.g. to “prod”
a
let me just re-do everything from scratch and i'll report back... like always, i'm sure you're spot on and i've missed something
a
As long as labels are set, your flow should work fine on both. You use Prefect Cloud, correct? In that case, you can use Secrets set in Prefect Cloud UI and you can set:
Copy code
export PREFECT__CLOUD__USE_LOCAL_SECRETS=false
it can be that you currently have some Secrets defined locally in config.toml and those are on a local machine but not on Azure?
a
I'm using server atm on a VM, secrets are defined both sides no problem. Do I define my labels on register ie.
Copy code
flow.register(
    project_name='Capture-ETL',
    idempotency_key=flow.serialized_hash(),
    labels=['Cloud01']
)
or through the run config? (or both / either!)
a
on run config
Copy code
from prefect.run_configs import UniversalRun
from prefect import task, Flow

with Flow("Load-Data-To-Azure"
        , executor=LocalDaskExecutor()
        , storage=Git(
            flow_path = ".my-flow.py",
            repo="prefect",
            branch_name="main",
            git_clone_url_secret_name="DEVOPS_URL"
        ),
    run_config=UniversalRun(labels=["label"])
          ) as flow:
👍 1
a
no luck... same error, i've added the run config in as suggested (did it under a Universal rather than Local though) and still get that error message
a
can you confirm that you use the same Prefect version across your: • Server • local agent • local agent on Azure VM • the environment from which you register your flows? Server version must be >= flow registration and execution environment
ideally, you can match the Python version as well to avoid any surprises
a
cool, let me just go ahead and check all of the above
OK... so.... My VM had PY3.7 installed...upgraded that to 3.9.5 (my local dev env is 3.9.1) My local dev / az agent and az server are now running on prefect 0.15.9 The old error is gone and i'm now getting: Failed to load and execute Flow's environment: ModuleNotFoundError("No module named '_cffi_backend'") I have had to reinstall all my py packages as I didn't realise upgrading py would remove all my modules!
a
I think this is an issue related to some modules you had on your local machine and you don’t have those on your Azure instance. I will post an example how you can add a path to your agent. Btw, did you know that Prefect Cloud has now 20,000 free task runs each month? Prefect Cloud is much easier to operate, right now you are facing a DevOps problem related to Server.
If you switch to a LocalRun, you can add a working directory for your flow and this could point to a location of where your custom modules are stored:
Copy code
flow.run_config = LocalRun(
    working_dir="/path/to/working-directory"
)
and when you start a local agent you can add a path this way:
Copy code
prefect agent local start -p /Users/adam/some_prefect_modules/
🤟 1