Hi prefect community, I'm new to Prefect. Does any...
# ask-community
h
Hi prefect community, I'm new to Prefect. Does anyone know why do cancel/change state option in the Prefect UI does not result in state handler function being called? Am I missing something? for example, I could not see the logger info msgs on clicking cancel or change state of flow/tasks after registering this flow.
Copy code
from prefect import Flow, task
from prefect.tasks.shell import ShellTask
import prefect

def test_on_cancel(flow, old_state, new_state):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(old_state)
    <http://logger.info|logger.info>(new_state)

@task(on_failure=test_on_cancel, state_handlers=[test_on_cancel])
def plus_one(x):
    """A task that adds 1 to a number"""
    return x + 1

@task(on_failure=test_on_cancel, state_handlers=[test_on_cancel])
def build_command():
    return 'sleep 200'

run_in_bash = ShellTask(name='run a command in bash', on_failure=test_on_cancel, state_handlers=[test_on_cancel])

with Flow('Best Practices') as flow:
    two = plus_one(1)
    cmd = build_command()
    shell_result = run_in_bash(command=cmd)
    shell_result.set_upstream(two)
k
Hey @Harish, cancelling just stops processing entirely. What are you trying to do with the state handler on the cancelled flow?
h
Hi Kevin, I want to make sure when someone cancel/change state in the UI, its corresponding state handler is called. For example, the task that created a docker container should result in the deletion of that container when someone click cancel/change state to fail for that task/flow.
k
Nice picture! Haha. I’d have to ask the team about this and get back to you tomorrow.
h
hehe, thanks Kevin, I really appreciate it if you could ask them.
k
I chatted with the team.
Failed
is different from
Cancelled
so
on_failure
won’t work. Can you try something like:
Copy code
def test_on_cancel(flow, old_state, new_state):
    if new_state.is_cancelled():
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(old_state)
        <http://logger.info|logger.info>(new_state)

@task(on_failure=test_on_cancel, state_handlers=[test_on_cancel])
def plus_one(x):
    """A task that adds 1 to a number"""
    return x + 1
h
Hi @Kevin Kho, I tried this, it fails on Prefect version 0.15.1
Copy code
Exception raised while calling state handlers: AttributeError("'Running' object has no attribute 'is_cancelled'",)
Traceback (most recent call last):
  File "/home/prefect/prefect_code/lib/python3.6/site-packages/prefect/engine/cloud/task_runner.py", line 65, in call_runner_target_handlers
    old_state=old_state, new_state=new_state
  File "/home/prefect/prefect_code/lib/python3.6/site-packages/prefect/engine/task_runner.py", line 114, in call_runner_target_handlers
    new_state = handler(self.task, old_state, new_state) or new_state
  File "<string>", line 11, in test_on_cancel
AttributeError: 'Running' object has no attribute 'is_cancelled'
k
You’re right. We don’t have that method. Do
if isinstance(new_state, Cancelled)
, where cancelled comes from
prefect.engine.state
h
@Kevin Kho tried it, It's not working,
Copy code
14:38:35 INFO CloudTaskRunner
Task 'run a command in bash': Starting task run...
14:38:49 INFO CloudFlowRunner
Flow run has been cancelled, cancelling active tasks
k
Unfortunately, that was the best effort. It doesn’t look like the state handlers are hit then because a state handler is run on the FlowRunner local before the API request sent to the UI to change the state of the task. Cancelling from the UI is a direct change to the state that propagates to the Flow, so it won’t hit the state handler and just cancels the process. I am not seeing a way to automatically spin down infrastructure unless it happens from the Flow itself. Maybe the next best thing would be to send an alert to check to spin down stuff upon Flow cancellation?
h
Thank you Kevin, makes sense now.