sandeep kumar
04/21/2023, 10:49 AMfrom prefect import task, flow, get_run_logger, allow_failure
from prefect.task_runners import SequentialTaskRunner
@flow
def flow_1(p: int) -> int:
logger = get_run_logger()
<http://logger.info|logger.info>(f" \n\n ----------------Flow 1 --{p}-------------\n\n")
return 5
@flow
def flow_2(m: int):
raise ValueError("Non-deterministic error has occured.")
@flow
def combine_data(p: list) -> int:
logger = get_run_logger()
<http://logger.info|logger.info>(f"-\n\n------------Combining data -- {p}-------------\n\n")
return sum(p)
@flow
def additional_flow():
logger = get_run_logger()
<http://logger.info|logger.info>(f"-\n\n------------additional_flow -- {p}-------------\n\n")
@flow
def clean_up_task(result: int):
logger = get_run_logger()
<http://logger.info|logger.info>(f"\n\nCleaning up 🧹 --- {result}-----\n\n")
<http://logger.info|logger.info>("\n\nCleaning ------------------------------ up 🧹\n\n")
@flow(task_runner=SequentialTaskRunner)
def allow_flaky_transformation_to_pass():
f1 = flow_1(1)
f2 = flow_2(2)
result = combine_data.submit(
[f1, f2], wait_for=[allow_failure(f1), allow_failure(f2)]) # wait for all upstream flows
additional_flow()
clean_up_task.submit() # This should always run even if all of the above flow fails
if __name__ == "__main__":
allow_flaky_transformation_to_pass()