Slackbot
12/11/2024, 4:27 PMMarvin
12/11/2024, 4:27 PMi would like to use as example the following:from prefect import task, flow from prefect.states import Completed, Failed @task def always_fails_task(): raise ValueError("I fail successfully") @task def always_succeeds_task(): print("I'm fail safe!") return "success" @flow def always_succeeds_flow(): x = always_fails_task.submit() y = always_succeeds_task.submit() if y.result() == "success": return Completed(message="I am happy with this result") else: return Failed(message="How did this happen!?") if name == "__main__": always_succeeds_flow() in particular this feature if y.result() == "success": return Completed(message="") and bring it in my code: @task def run_uploader(uploader: Uploader): uploader.load() @flow async def single_asset_uploader(asset_id: str) -> None: data_contract = get_data_contract(asset_id) await remove_ok_file(data_contract) await data_dock_to_data_gateway(data_contract, schedule_id) uploaders = get_uploaders_from_data_contract(data_contract, datetime) upload_tasks = run_uploader.map(uploaders) await create_ok_file(wait_for=upload_tasks, data_contract=data_contract) in particular when one of task go in the statis creashed, i would like to force the