Hi there - I’m trying to write a state handler for...
# ask-community
k
Hi there - I’m trying to write a state handler for a task that submits work to a service via a rest api. The state_handler code is:
Copy code
def nomad_batch_node_cancel_handler(task: Task, old_state: State, new_state: State):
        if (new_state == Cancelled) & (old_state == Running):
            <http://_LOGGER.info|_LOGGER.info>(f"killing job {nomad_job_name}")
            Nomad().stop_job(job_id=nomad_job_name)
It gets
nomad_job_name
from an enclosing function (it’s a closure). When I click the “cancel” button in the UI - I don’t think this handler is getting executed - my log message isn’t there, and the task is not killed. Is there something wrong with how I’m testing for state change? It seems to me that the task is in a
Running
state when I click cancel, and the UI seems to think it goes to a
Cancelled
state thereafter.
k
Hey @Kevin Weiler, unfortunately, state handlers are not passed during the flow cancellation. Related to this. I’d encourage you to chime in there. This is on our radar but no immediate timetable as it requires design of a new component (maybe)
k
thanks @Kevin Kho - would it make sense to rely on the “Set State” button here?
k
No because hitting the API like that just sets the state, but it’s the Flow Runner responsible for passing through state handlers before reporting a final state
k
so is there no way to stop tell running tasks to stop AND do something when stopped?
k
Only from the Flow side. Not from the Cloud side yet. The initial thoughts are to have agent side handlers that receive that cancelled signal and then shut down the hardware you want to shut down
k
Not sure what is meant by “only from the Flow side” - I don’t think it’s possible to interrupt a running flow other than through the API right? Also, for the record, we’re using Prefect Server - but I suspect it behaves the same as Prefect Cloud in this regard.
my task is in a 1-second polling loop when it gets cancelled - what is prefect actually doing to exit that loop? Maybe I can catch a signal from within that loop…
k
It does. If your state handler has both code to stop everything and then set the state, it might be able to exit gracefully, but when using the API, you set the state of the flow, it gets sent to the agent, and the agent does an aggressive cancellation of the running processes.
k
Right, which in my case is docker - presumably it sends SIGINT, waits for a few seconds, and then sends SIGKILL.
k
Looking at the flow runner and I am wrong about agent stopping it. The flow runner does
k
Is that the executor?
LocalDaskExecutor in my case
k
So there is this line with wraps the execution in the context manager, and then i believe it is handles on executor side
k
Oops - meeting - will have a look and get back - I’ll chime in on that thread either way. Thanks!
k
So then it goes <http://v|here> for cancellation I assume but haven’t exactly connected it.
k
☝️ - i think this link is broken
k
Real link . The DaskExecutors are honestly not too legible though