Harish
07/28/2021, 11:41 PMfrom prefect import Flow, task
from prefect.tasks.shell import ShellTask
import prefect
def test_on_cancel(flow, old_state, new_state):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(old_state)
<http://logger.info|logger.info>(new_state)
@task(on_failure=test_on_cancel, state_handlers=[test_on_cancel])
def plus_one(x):
"""A task that adds 1 to a number"""
return x + 1
@task(on_failure=test_on_cancel, state_handlers=[test_on_cancel])
def build_command():
return 'sleep 200'
run_in_bash = ShellTask(name='run a command in bash', on_failure=test_on_cancel, state_handlers=[test_on_cancel])
with Flow('Best Practices') as flow:
two = plus_one(1)
cmd = build_command()
shell_result = run_in_bash(command=cmd)
shell_result.set_upstream(two)
Kevin Kho
Harish
07/29/2021, 1:49 AMKevin Kho
Harish
07/29/2021, 2:16 AMKevin Kho
Failed
is different from Cancelled
so on_failure
won’t work. Can you try something like:
def test_on_cancel(flow, old_state, new_state):
if new_state.is_cancelled():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(old_state)
<http://logger.info|logger.info>(new_state)
@task(on_failure=test_on_cancel, state_handlers=[test_on_cancel])
def plus_one(x):
"""A task that adds 1 to a number"""
return x + 1
Harish
07/29/2021, 6:27 PMException raised while calling state handlers: AttributeError("'Running' object has no attribute 'is_cancelled'",)
Traceback (most recent call last):
File "/home/prefect/prefect_code/lib/python3.6/site-packages/prefect/engine/cloud/task_runner.py", line 65, in call_runner_target_handlers
old_state=old_state, new_state=new_state
File "/home/prefect/prefect_code/lib/python3.6/site-packages/prefect/engine/task_runner.py", line 114, in call_runner_target_handlers
new_state = handler(self.task, old_state, new_state) or new_state
File "<string>", line 11, in test_on_cancel
AttributeError: 'Running' object has no attribute 'is_cancelled'
Kevin Kho
if isinstance(new_state, Cancelled)
, where cancelled comes from prefect.engine.state
Harish
07/29/2021, 6:44 PM14:38:35 INFO CloudTaskRunner
Task 'run a command in bash': Starting task run...
14:38:49 INFO CloudFlowRunner
Flow run has been cancelled, cancelling active tasks
Kevin Kho
Harish
07/29/2021, 7:01 PM