Igor
09/28/2024, 5:49 PMMarvin
09/28/2024, 5:50 PMprefect agent startIgor
09/28/2024, 5:51 PMMarvin
09/28/2024, 5:51 PMCOMPLETEDFAILEDCRASHEDKeyboardInterruptSIGTERMCRASHEDRUNNINGprefect flow-run canceltimeout_secondsTimedOutpython
     from prefect import flow
     import time
     @flow(timeout_seconds=1, log_prints=True)
     def show_timeouts():
         print("I will execute")
         time.sleep(5)
         print("I will not execute")CRASHEDIgor
09/28/2024, 6:00 PMMarvin
09/28/2024, 6:01 PMsh
   prefect flow-run cancel 'your-flow-run-id'sh
   prefect flow-run cancel 'a55a4804-9e3c-4042-8b59-b3b6b7618736'python
   from prefect.client.orchestration import get_client
   from prefect.states import StateType
   import anyio
   async def list_flow_runs_with_states(states):
       async with get_client() as client:
           flow_runs = await client.read_flow_runs(state_types=states)
       return flow_runs
   async def cancel_flow_runs(flow_runs):
       async with get_client() as client:
           for idx, flow_run in enumerate(flow_runs):
               print(f"[{idx + 1}] Cancelling flow run '{flow_run.name}' with ID '{flow_run.id}'")
               state_updates = {
                   "name": "Cancelled",
                   "type": StateType.CANCELLED
               }
               state = flow_run.state.copy(update=state_updates)
               await client.set_flow_run_state(flow_run.id, state, force=True)
   async def bulk_cancel_flow_runs():
       states = [StateType.PENDING, StateType.RUNNING, StateType.SCHEDULED, StateType.LATE]
       flow_runs = await list_flow_runs_with_states(states)
       while len(flow_runs) > 0:
           print(f"Cancelling {len(flow_runs)} flow runs\n")
           await cancel_flow_runs(flow_runs)
           flow_runs = await list_flow_runs_with_states(states)
       print("Done!")
   if __name__ == "__main__":
       anyio.run(bulk_cancel_flow_runs)Andrew Brookins
09/28/2024, 8:58 PMIgor
09/29/2024, 3:08 PM