https://prefect.io logo
Title
s

sandeep kumar

04/21/2023, 10:49 AM
Hi All, we are migrating from v1 to v2, we wanted to implement something like below, but if sub flow fails parent flow also fails but wanted to run clean up flow regardless of upstream flows fail state
from prefect import task, flow, get_run_logger, allow_failure
from prefect.task_runners import SequentialTaskRunner

@flow
def flow_1(p: int) -> int:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f" \n\n ----------------Flow 1 --{p}-------------\n\n")
    return 5

@flow
def flow_2(m: int):
    raise ValueError("Non-deterministic error has occured.")


@flow
def combine_data(p: list) -> int:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"-\n\n------------Combining data -- {p}-------------\n\n")
    return sum(p)

@flow
def additional_flow():
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"-\n\n------------additional_flow -- {p}-------------\n\n")
@flow
def clean_up_task(result: int):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"\n\nCleaning up 🧹 --- {result}-----\n\n")
    <http://logger.info|logger.info>("\n\nCleaning ------------------------------ up 🧹\n\n")


@flow(task_runner=SequentialTaskRunner)
def allow_flaky_transformation_to_pass():
    f1 = flow_1(1)
    f2 = flow_2(2)
    result = combine_data.submit(
        [f1, f2], wait_for=[allow_failure(f1), allow_failure(f2)]) # wait for all upstream flows
    additional_flow()
    clean_up_task.submit()  # This should always run even if all of the above flow fails


if __name__ == "__main__":
    allow_flaky_transformation_to_pass()
Need ur help @Zanie we are running into errors above code is failing