This message was deleted.
# ask-marvin
s
This message was deleted.
m
Your message was too long by 37 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
i would like to use as example the following:
from prefect import task, flow from prefect.states import Completed, Failed @task def always_fails_task(): raise ValueError("I fail successfully") @task def always_succeeds_task(): print("I'm fail safe!") return "success" @flow def always_succeeds_flow(): x = always_fails_task.submit() y = always_succeeds_task.submit() if y.result() == "success": return Completed(message="I am happy with this result") else: return Failed(message="How did this happen!?") if name == "__main__": always_succeeds_flow() in particular this feature if y.result() == "success": return Completed(message="I am happy with this result") and bring it in my code: @task(task_run_name=generate_task_name) def run_uploader(uploader: Uploader): uploader.load() @flow(flow_run_name=generate_flow_run_name) async def single_asset_uploader(asset_id: str, schedule_id: int = 1, datetime = datetime.now()) -> None: data_contract = get_data_contract(asset_id) await remove_ok_file(data_contract) await data_dock_to_data_gateway(data_contract, schedule_id) uploaders = get_uploaders_from_data_contract(data_contract, datetime) upload_tasks = run_uploader.map(uploaders) await