John Mizerany
10/11/2022, 2:30 PMBianca Hoch
10/11/2022, 4:07 PMfrom prefect.client import Client
import prefect
import time
def cancel_runs_for_flow(version_group_id: str) -> None:
c = Client()
query = c.graphql(
"\n".join(
[
"query {",
" flow_run(where: {",
" _and: {",
f' flow: {{version_group_id: {{_eq: "{version_group_id}"}}}}',
" state: {",
" _in: [",
' "Cancelling"]',
" }",
" }",
" }",
" ) {",
" id",
" }}",
]
)
)
result = query.get("data").get("flow_run")
id_list = list()
print("Fetched flows")
for i in result:
flow_id = i["id"]
state = prefect.engine.state.Cancelled()
version = None
print(i)
time.sleep(1)
c.set_flow_run_state(flow_run_id=flow_id, state=state)
John Mizerany
10/11/2022, 4:11 PMBianca Hoch
10/11/2022, 4:17 PMtimeout
argument in the task decorator. This will fail a task if it takes longer than the specified time.
@task(timeout=300, state_handlers=[some_action_on_failed])
def some_task():
pass
John Mizerany
10/11/2022, 5:39 PM