https://prefect.io logo
Title
t

Tabari Brannon

05/12/2023, 12:58 PM
@flow
def georgia_table_flow(files):
    truncate = truncate_tables(files)
    import_stg = import_data(files)
    alter_import = switcheroo(files)


@flow
def georgia_etl():
    table_flows = [georgia_table_flow(file) for file in files]

georgia_etl()
These are my flows and my sub flow. I am having two problems 1. I want to name my subflow based upon where my main flow is in the loop this will be helpful to identify business logic tied to the subflow. Secondly when one of my subflows failes it stops the main flow. Is there away to allow the rest of the subflows to continue to run. Thanks!
1
d

Deceivious

05/12/2023, 1:26 PM
table_flows = [georgia_table_flow.with_options(name=f"new_name_{idx}")(file,return_state=True) for idx,file in enumerate(files)]
🙌 1
This should fix both your issue
f

Filip Panovski

05/12/2023, 1:27 PM
Alternatively for 2: You can submit the flow and then use
raise_on_failure=False
, e.g. :
@flow
def always_fails_flow():
    always_fails_task.submit().result(raise_on_failure=False)
    always_succeeds_task()
See docs.
t

Tabari Brannon

05/12/2023, 2:15 PM
Thank you this partially worked I had to modify it this way to get what I desired table_flows = [georgia_table_flow.with_options(name=f"new_name_"+file[idx])(file,return_state=True) for idx,file in enumerate(files)] Thanks! @Deceivious this now works perfectly.
🙌 1