sandeep kumar

04/21/2023, 10:49 AM
Hi All, we are migrating from v1 to v2, we wanted to implement something like below, but if sub flow fails parent flow also fails but wanted to run clean up flow regardless of upstream flows fail state
from prefect import task, flow, get_run_logger, allow_failure
from prefect.task_runners import SequentialTaskRunner

def flow_1(p: int) -> int:
    logger = get_run_logger()
    <|>(f" \n\n ----------------Flow 1 --{p}-------------\n\n")
    return 5

def flow_2(m: int):
    raise ValueError("Non-deterministic error has occured.")

def combine_data(p: list) -> int:
    logger = get_run_logger()
    <|>(f"-\n\n------------Combining data -- {p}-------------\n\n")
    return sum(p)

def additional_flow():
    logger = get_run_logger()
    <|>(f"-\n\n------------additional_flow -- {p}-------------\n\n")
def clean_up_task(result: int):
    logger = get_run_logger()
    <|>(f"\n\nCleaning up 🧹 --- {result}-----\n\n")
    <|>("\n\nCleaning ------------------------------ up 🧹\n\n")

def allow_flaky_transformation_to_pass():
    f1 = flow_1(1)
    f2 = flow_2(2)
    result = combine_data.submit(
        [f1, f2], wait_for=[allow_failure(f1), allow_failure(f2)]) # wait for all upstream flows
    clean_up_task.submit()  # This should always run even if all of the above flow fails

if __name__ == "__main__":
Need ur help @Zanie we are running into errors above code is failing