Noam polak
03/16/2022, 12:35 PMKevin Kho
def mystatehandler(obj, old_state, new_state):
if isinstance(new_state, Cancelled):
...
Kevin Kho
Noam polak
03/21/2022, 1:04 PMdef set_flow_run_state_to_fail(
flow_run_id: str,
) -> str:
#get the version id
query = """
query QueryFlowRun($flowRunId: uuid!){
flow_run_by_pk(id: $flowRunId) {
version
}
}
"""
request_data = {
"query": query,
"variables": {"flowRunId": flow_run_id}
}
resp = <http://requests.post|requests.post>(PREFECT_URL, json=request_data)
resp.raise_for_status()
version = resp.json()["data"]['flow_run_by_pk']["version"]
query = """
mutation($states: set_flow_run_states_input!){
set_flow_run_states(input: $states) {
states {
id
}
}
}
"""
input_params = [{
"state": json.dumps({
"type": "Failed",
"message": "Forced failure",
}),
"version": version,
"flow_run_id": flow_run_id,
}]
<http://logger.info|logger.info>(input_params)
resp = <http://requests.post|requests.post>(PREFECT_URL, json={
"query": query,
"variables": {"states":input_params}
})
resp.raise_for_status()
return resp.json()["data"]["states"]["id"]
What can be the problem here
thanks
NoamKevin Kho
Client.set_flow_run_state
, or you can see it’s source code here because it takes some formattingNoam polak
03/21/2022, 3:01 PMKevin Kho
Noam polak
03/21/2022, 3:15 PMdef post_to_slack_handler_fq(flow: Flow, old_state: State, new_state: State) -> State:
if any([new_state.is_failed(), new_state.is_successful(), new_state.is_running(), new_state.is_pending(), isinstance(new_state, Cancelled)]):
_post_to_slack_handler_fq(new_state, None)
return new_state
Noam polak
03/21/2022, 3:15 PMKevin Kho
Kevin Kho
Kevin Kho
Kevin Kho
Kevin Kho
Noam polak
03/22/2022, 7:33 AMNoam polak
03/22/2022, 7:34 AMNoam polak
03/22/2022, 7:36 AMNoam polak
03/22/2022, 8:20 AMNoam polak
03/24/2022, 7:31 AMKevin Kho
Noam polak
03/27/2022, 7:31 AM