Andrew Nichol
05/21/2021, 4:47 PMKevin Kho
Andrew Nichol
05/21/2021, 4:54 PMKevin Kho
Andrew Nichol
05/21/2021, 8:45 PMdef run_on_cancel(task, old_state, new_state):
global some_global_variable
if isinstance(new_state, prefect_state.Cancelled):
logger = prefect_context.get("logger")
<http://logger.info|logger.info>(F"Running CANCELATION: {some_global_variable}")
raise Exception(f"exiting because of Cancelled state: {some_global_variable}")
return new_state
@task
def supposed_to_succeed():
global some_global_variable
some_global_variable = "beaver"
logger = prefect_context.get("logger")
<http://logger.info|logger.info>("Running success")
sleep(15)
return True
with Flow(
"failure-flow",
executor=LocalDaskExecutor(),
result=PrefectResult(),
state_handlers=[run_on_cancel]
) as failure_flow:
some_global_variable = None
succeed = supposed_to_succeed()
maybe_failure = supposed_to_fail(succeed)
last(maybe_failure)
Kevin Kho
Andrew Nichol
05/22/2021, 12:28 AM