Matt Alhonte
04/13/2022, 6:39 PMAnna Geller
04/13/2022, 6:44 PMMatt Alhonte
04/13/2022, 6:47 PMAnna Geller
04/13/2022, 6:50 PMMatt Alhonte
04/13/2022, 6:53 PMAnna Geller
04/13/2022, 6:54 PMMatt Alhonte
04/13/2022, 6:54 PMAnna Geller
04/13/2022, 6:58 PMquery {
flow_run(
where: {_and: [{flow_id: {_eq: "eb45ed0a-a76c-445b-b0b8-1fdc05c1f54c"}},
{state: {_eq: "Submitted"}}]}
) {
id
}
}
just manually look up the flow-id from the flow page 😛 easier than doing nested queriesMatt Alhonte
04/13/2022, 6:59 PMAnna Geller
04/13/2022, 7:00 PMfrom prefect import Client
client = Client()
list_of_flow_run_ids_to_cancel = [] # result of the query
for id_ in list_of_flow_run_ids_to_cancel:
client.cancel_flow_run(flow_run_id=id_)
Matt Alhonte
04/13/2022, 8:49 PMfrom prefect.client import Client
import prefect
import time
def cancel_runs_for_flow(version_group_id: str) -> None:
c = Client()
query = c.graphql(
"\n".join(
[
"query {",
" flow_run(where: {",
" _and: {",
f' flow: {{version_group_id: {{_eq: "{version_group_id}"}}}}',
" state: {",
" _in: [",
' "Pending", "Running", "Submitted"]',
" }",
" }",
" }",
" ) {",
" id",
" }}",
]
)
)
result = query.get("data").get("flow_run")
id_list = list()
print("Fetched flows")
for i in result:
flow_id = i["id"]
state = prefect.engine.state.Cancelled()
version = None
print(i)
time.sleep(1)
c.set_flow_run_state(flow_run_id=flow_id, state=state)
Anna Geller
04/13/2022, 8:53 PM