Sean Conroy
06/07/2023, 7:59 PMDeceivious
06/08/2023, 7:57 AMGhislain Picard
06/08/2023, 11:30 AMfrom prefect.client import get_client
from prefect.states import State
from prefect.server.schemas.states import StateType
from prefect.server.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateType
from prefect.exceptions import ObjectNotFound
async def cancel(running_only):
if running_only:
statefilter = FlowRunFilter(state=FlowRunFilterState(type=FlowRunFilterStateType(any_=[StateType.RUNNING])))
else:
statefilter = None
async with get_client() as client:
offset = 0
flows = await client.read_flow_runs(offset=offset, flow_run_filter=statefilter)
while flows:
# limit = datetime.datetime.now() - timedelta(seconds=timeout)
for flow in flows:
#print(flow.state_name)
print(flow.start_time)
cancelling_state = State(type=StateType.CANCELLING)
try:
result = await client.set_flow_run_state(flow_run_id=flow.id, state=cancelling_state)
except ObjectNotFound:
raise Exception(f"Flow run '{id}' not found!")
result
offset += len(flows)
flows = await client.read_flow_runs(offset=offset, flow_run_filter=statefilter)
Sean Conroy
06/08/2023, 2:10 PM