Hey All I have a flow with "*state_handlers*" to ...
# ask-community
n
Hey All I have a flow with "*state_handlers*" to slack and some api. Regarding that I have 2 questions: 1. When I change the flow's state from the Ui to Failed or Cancelled , I don't get any message from the handler , it's like it doesn't trigger the state handlers, Am I correct? 2. I want to create API call to the prefect server to change state or cancel flow_run, is it supposed to trigger the state handlers?
k
Oof looks like this got buried by the post below. Changing from the UI is a direct insertion into our database so it won’t trigger the Failure state handlers. I believe for Cancelling it can be triggered, but not for Failed.
Copy code
def mystatehandler(obj, old_state, new_state):
    if isinstance(new_state, Cancelled):
        ...
See this
n
Hey @Kevin Kho thanks for the response So I want the change the state through API - graphql But i have some difficulties: 1. The docs seems to be old since I don't see the set_flow_run_state in the interactive api 2. I tried to use "set_flow_run_states" , But I always get a syntax error that I can't find how to fix them:
Copy code
def set_flow_run_state_to_fail(
    flow_run_id: str,
) -> str:
  #get the version id
    query = """ 

        query QueryFlowRun($flowRunId: uuid!){
        flow_run_by_pk(id: $flowRunId) {
            version
        }
        }
    """
    request_data = {
        "query": query,
        "variables": {"flowRunId": flow_run_id}
    }
    resp = <http://requests.post|requests.post>(PREFECT_URL, json=request_data)
    resp.raise_for_status()
    version = resp.json()["data"]['flow_run_by_pk']["version"]
   
    query = """
        mutation($states: set_flow_run_states_input!){
        set_flow_run_states(input: $states) {
            states {
                id
            }
        }
    }
    """

    input_params = [{
        "state": json.dumps({
            "type": "Failed",
            "message": "Forced failure",
            }),
        "version": version,
        "flow_run_id": flow_run_id,
    }]
    <http://logger.info|logger.info>(input_params)
    resp = <http://requests.post|requests.post>(PREFECT_URL, json={
        "query": query, 
        "variables": {"states":input_params} 
        
    })
    resp.raise_for_status()

    return resp.json()["data"]["states"]["id"]
What can be the problem here thanks Noam
k
I see what you mean. I think it will be easier if you just use
Client.set_flow_run_state
, or you can see it’s source code here because it takes some formatting
n
It worked - changed the state to cancel or whatever I wanted but it still doesn't trigger my state handlers
k
Could I see your state handler definition? What is your RunConfig too? Kubernetes?
n
slack handler:
Copy code
def post_to_slack_handler_fq(flow: Flow, old_state: State, new_state: State) -> State:
    if any([new_state.is_failed(), new_state.is_successful(), new_state.is_running(), new_state.is_pending(), isinstance(new_state, Cancelled)]):
        _post_to_slack_handler_fq(new_state, None)
    return new_state
kubernetes
k
Let me try this though I think I will run into the same thing.
Hey @Noam polak, I just finished testing this and state handlers on the task will work. Are you open to putting it there? I have an example here that you can copy, register, and run.
I take it back @Noam polak, it is the flow level state handler that works. I edited the Github above and these are my new logs:
👀 1
How did you attach this to your Flow? Is it in the same file or a different one?
n
it's a different file but I know from the logs that it use the right env When I am looking at the difference between my flow and yours , maybe it's related to the fact that mine triggers other flows, so when I cancel the main flow it cause errors to the child flows : Failed to set task state with error: ClientError([{'message': 'State update failed for task run ID 9a5f3ce5-3239-410b-b665-08864644112f: provided a running state but associated flow run 48478ce6-2b40-4ef6-98a1-f97957f6c9eb is not in a running state.', 'locations': [{'line': 2, 'column': 5}], 'path': ['set_task_run_states'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'State update failed for task run ID 9a5f3ce5-3239-410b-b665-08864644112f: provided a running state but associated flow run 48478ce6-2b40-4ef6-98a1-f97957f6c9eb is not in a running state.'}}}]) Traceback (most recent call last): File "/usr/local/lib/python3.9/site-packages/prefect/engine/cloud/task_runner.py", line 91, in call_runner_target_handlers state = self.client.set_task_run_state( File "/usr/local/lib/python3.9/site-packages/prefect/client/client.py", line 1922, in set_task_run_state result = self.graphql( File "/usr/local/lib/python3.9/site-packages/prefect/client/client.py", line 570, in graphql raise ClientError(result["errors"]) prefect.exceptions.ClientError: [{'message': 'State update failed for task run ID 9a5f3ce5-3239-410b-b665-08864644112f: provided a running state but associated flow run 48478ce6-2b40-4ef6-98a1-f97957f6c9eb is not in a running state.', 'locations': [{'line': 2, 'column': 5}], 'path': ['set_task_run_states'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'State update failed for task run ID 9a5f3ce5-3239-410b-b665-08864644112f: provided a running state but associated flow run 48478ce6-2b40-4ef6-98a1-f97957f6c9eb is not in a running state.'}}}]
One more thing I want to ask is how did you triggered the cancellation?
When I cancel in the ui it send a slack message but not from the prefect client
I changed the command to cancel_flow_run and now it works
Thanks a lot for all the help @Kevin Kho
k
Oh man did I forget to respond? Seems like you are good now?
n
Yap. I am all good