Hi, I found something interesting about Cancelling...
# ask-community
l
Hi, I found something interesting about Cancelling and Failing a run. Whenever I press Cancel in my Flow run, all tasks should be stopped and have the
state.Cancelling
and
state.Cancelled
as
True
, but none of them occurs. It only register as
Cancelling
or
Cancelled
when I put the flow to the State (UI) as Cancelled. Another weird fact is if I put the State as Cancelled, my tasks keeps running until it finishes (either in fail or succeed). I'm currently working on State handlers, how can I be sure that I'm cancelling my flow runs and consequently my tasks for that flow run?
The state of the Flow is already as
Cancelled
, but the task is showing as
Running
(but I know that the task is paused/stopped, because there are no outputs).
a
Cancellation is a best effort attempt to kill execution initiated on your infrastructure, but there is no guarantee unfortunately because some jobs cannot be killed just via API call. There are some ways to mitigate it, e.g. if you use a temporary Dask cluster for each flow run, Prefect could stop and remove the cluster to force cancelling of the tasks execution
l
Oh.. I see, my goal is to have a state handler that can have
Cancelled
or
Failed
flow runs and send a POST do databricks in order to cancel the submitJob/submitRun since a databricks task only calls the POST run job and doesn't call the
Cancel/Stop
whenever the flow stops.
a
I totally understand why this would be valuable, but currently it’s not possible to do that. When you Cancel a flow run, the flow run exits immediately so it’s not possible to react to a Cancelled state via state handler because the flow run is already finished upon cancellation. There is an open issue for it but it could be that this will be tackled more elegantly in Orion with the Crashed state.
Cancelled is a child state of Finished - that’s why it’s a terminal state you cannot react to using a state handler
l
Hey Anna, thanks! I've managed to find out a possible solution, by adding the
state_handler
to the Flow and then sending a parameter with a condition for cancelled runs. I'm using a
LocalDaskExecutor
which should be fine to send the
https://<databricks-instance>/api/2.1/jobs/runs/cancel
.
a
You’re right, I was misled by the fact that the State class doesn’t have the method “is_cancelling” or “is_cancelled”, but doing it this way actually works: https://gist.github.com/69a38677b8ab56c2da2d80eeea9ea1c4 using:
Copy code
if isinstance(new_state, prefect.engine.state.Cancelled)
l
Yep! I've actually managed to find out a way to work. Of course it could have some issues within the latency/lag of the task+flow+prefect+databricks, but I'd say it works pretty well!
Copy code
def canceled_state_handler(task, old_state, new_state):
    if ((isinstance(new_state,state.Cancelling)) or (isinstance(new_state,state.Cancelled))):
        cancel_run()
    return new_state

def cancel_run():

    cancel_url = f"{DOMAIN}api/2.0/jobs/runs/cancel"
    cancel_payload = json.dumps({
        "run_id": RUN_ID
    })
    requests.request("POST",url=cancel_url,headers=headers,data=cancel_payload)
This is what i've managed to do... still testing but it turned out pretty good. (used some global vars, trying to find out a way to not use with state handlers)
a
nice work! 🙌
🙌 1