l

    Lucas Hosoya

    8 months ago
    Hi, I found something interesting about Cancelling and Failing a run. Whenever I press Cancel in my Flow run, all tasks should be stopped and have the
    state.Cancelling
    and
    state.Cancelled
    as
    True
    , but none of them occurs. It only register as
    Cancelling
    or
    Cancelled
    when I put the flow to the State (UI) as Cancelled. Another weird fact is if I put the State as Cancelled, my tasks keeps running until it finishes (either in fail or succeed). I'm currently working on State handlers, how can I be sure that I'm cancelling my flow runs and consequently my tasks for that flow run?
    The state of the Flow is already as
    Cancelled
    , but the task is showing as
    Running
    (but I know that the task is paused/stopped, because there are no outputs).
    Anna Geller

    Anna Geller

    8 months ago
    Cancellation is a best effort attempt to kill execution initiated on your infrastructure, but there is no guarantee unfortunately because some jobs cannot be killed just via API call. There are some ways to mitigate it, e.g. if you use a temporary Dask cluster for each flow run, Prefect could stop and remove the cluster to force cancelling of the tasks execution
    l

    Lucas Hosoya

    8 months ago
    Oh.. I see, my goal is to have a state handler that can have
    Cancelled
    or
    Failed
    flow runs and send a POST do databricks in order to cancel the submitJob/submitRun since a databricks task only calls the POST run job and doesn't call the
    Cancel/Stop
    whenever the flow stops.
    Anna Geller

    Anna Geller

    8 months ago
    I totally understand why this would be valuable, but currently it’s not possible to do that. When you Cancel a flow run, the flow run exits immediately so it’s not possible to react to a Cancelled state via state handler because the flow run is already finished upon cancellation. There is an open issue for it but it could be that this will be tackled more elegantly in Orion with the Crashed state.
    Cancelled is a child state of Finished - that’s why it’s a terminal state you cannot react to using a state handler
    l

    Lucas Hosoya

    8 months ago
    Hey Anna, thanks! I've managed to find out a possible solution, by adding the
    state_handler
    to the Flow and then sending a parameter with a condition for cancelled runs. I'm using a
    LocalDaskExecutor
    which should be fine to send the
    https://<databricks-instance>/api/2.1/jobs/runs/cancel
    .
    Anna Geller

    Anna Geller

    8 months ago
    You’re right, I was misled by the fact that the State class doesn’t have the method “is_cancelling” or “is_cancelled”, but doing it this way actually works: https://gist.github.com/69a38677b8ab56c2da2d80eeea9ea1c4 using:
    if isinstance(new_state, prefect.engine.state.Cancelled)
    l

    Lucas Hosoya

    8 months ago
    Yep! I've actually managed to find out a way to work. Of course it could have some issues within the latency/lag of the task+flow+prefect+databricks, but I'd say it works pretty well!
    def canceled_state_handler(task, old_state, new_state):
        if ((isinstance(new_state,state.Cancelling)) or (isinstance(new_state,state.Cancelled))):
            cancel_run()
        return new_state
    
    def cancel_run():
    
        cancel_url = f"{DOMAIN}api/2.0/jobs/runs/cancel"
        cancel_payload = json.dumps({
            "run_id": RUN_ID
        })
        requests.request("POST",url=cancel_url,headers=headers,data=cancel_payload)
    This is what i've managed to do... still testing but it turned out pretty good. (used some global vars, trying to find out a way to not use with state handlers)
    Anna Geller

    Anna Geller

    8 months ago
    nice work! 🙌