Sam Lawler
12/30/2024, 2:25 AMset_state
. Both of these methods successfully changes the state in UI however this also results in producing an error during the flow run. Whereas when i cancel via the UI, this cancels the run gracefully, not producing an error - this is what I'm trying to replicate.
I've tried:
• Moving the state to Cancelling then waiting 20 seconds, then moving to Cancelled
• Changing the force
and emit_event
API parameters
• Replicating what happens when you click on the UI button. From what I can see this is only calling the Cancelled API endpoint, but is providing many more parameters, unsure if these are triggering something further.
• Tried using try catch blocks to gracefully handle the errors - these are ignored
Here is an example of a flow run which the state was attempted to be changed to Cancelled. As you can see, the state was changed, but produced an error:
12:19:55.864 | INFO | prefect.engine - Created flow run 'hulking-anaconda' for flow 'test-flow-run'
12:19:55.866 | INFO | Flow run 'hulking-anaconda' - View at <https://app.prefect.cloud/account/365adb4c-4945-4161-b530-3cf9e783fe00/workspace/55844168-8f9f-40df-ba5e-95fc94df7cae/flow-runs/flow-run/ab848a5b-3d03-4ec3-afff-89613f193a3b>
12:19:57.662 | INFO | Flow run 'TEST_FLOW_RUN' - Created task run 'TASK 1: CANCEL FLOW RUN-0' for task 'TASK 1: CANCEL FLOW RUN'
12:19:57.665 | INFO | Flow run 'TEST_FLOW_RUN' - Executing 'TASK 1: CANCEL FLOW RUN-0' immediately...
12:19:58.833 | INFO | Task run 'TASK 1: CANCEL FLOW RUN-0' - Current Flow Id : ab848a5b-3d03-4ec3-afff-89613f193a3b
***** MADE API CALL HERE - REPONSE BELOW ******
{'state': {'id': 'efe9a76c-c378-4549-86f9-6cf320c0762b', 'type': 'CANCELLED', 'name': 'Cancelled', 'timestamp': '2024-12-30T02:19:59.516057Z', 'message': 'Run Cancelled', 'data': None, 'state_details': {'flow_run_id': 'ab848a5b-3d03-4ec3-afff-89613f193a3b', 'task_run_id': None, 'child_flow_run_id': None, 'scheduled_time': None, 'cache_key': None, 'cache_expiration': None, 'deferred': None, 'untrackable_result': False, 'pause_timeout': None, 'pause_reschedule': False, 'pause_key': None, 'run_input_keyset': None, 'refresh_cache': None, 'retriable': None, 'transition_id': None, 'task_parameters_id': None, 'traceparent': None}}, 'status': 'ACCEPT', 'details': {'type': 'accept_details'}}
12:19:59.988 | INFO | Task run 'TASK 1: CANCEL FLOW RUN-0' - Finished in state Completed()
12:20:00.616 | ERROR | Flow run 'hulking-anaconda' - Finished in state Cancelled('Run Cancelled')
Traceback (most recent call last):
File "c:\Users\sam.lawler\Visual Studio\Python Projects\prefect_test\app_flows\test_flow_run\test_flow_run.py", line 111, in <module>
test_flow_run()
File "C:\Users\sam.lawler\Visual Studio\Python Projects\prefect_test\venv\Lib\site-packages\prefect\flows.py", line 1128, in __call__
return enter_flow_run_engine_from_flow_call(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\sam.lawler\Visual Studio\Python Projects\prefect_test\venv\Lib\site-packages\prefect\engine.py", line 298, in enter_flow_run_engine_from_flow_call
retval = from_sync.wait_for_call_in_loop_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\sam.lawler\Visual Studio\Python Projects\prefect_test\venv\Lib\site-packages\prefect\_internal\concurrency\api.py", line 265, in wait_for_call_in_loop_thread
return call.result()
^^^^^^^^^^^^^
File "C:\Users\sam.lawler\Visual Studio\Python Projects\prefect_test\venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 285, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\sam.lawler\Visual Studio\Python Projects\prefect_test\venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 169, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.11_3.11.2544.0_x64__qbz5n2kfra8p0\Lib\concurrent\futures\_base.py", line 401, in __get_result
raise self._exception
File "C:\Users\sam.lawler\Visual Studio\Python Projects\prefect_test\venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 364, in _run_async
result = await coro
^^^^^^^^^^
File "C:\Users\sam.lawler\Visual Studio\Python Projects\prefect_test\venv\Lib\site-packages\prefect\client\utilities.py", line 51, in with_injected_client
return await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\sam.lawler\Visual Studio\Python Projects\prefect_test\venv\Lib\site-packages\prefect\engine.py", line 401, in create_then_begin_flow_run
return await state.result(fetch=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\sam.lawler\Visual Studio\Python Projects\prefect_test\venv\Lib\site-packages\prefect\states.py", line 91, in _get_state_result
raise await get_state_exception(state)
prefect.exceptions.CancelledRun: Run Cancelled
Bianca Hoch
12/31/2024, 5:36 PMfrom prefect import flow
from prefect import runtime
from prefect.logging.loggers import flow_run_logger
import time
def log_something(flow, flow_run, state):
flow_run_logger(flow_run, flow).info(f"Flow {flow.name} cancelled")
@flow(on_cancellation=[log_something])
def test_flow():
print(runtime.flow_run.id)
time.sleep(60)
return 42
if __name__=="__main__":
test_flow.serve(name="my-cancelled-flow")
Here is the script that uses the client to set the flow run to a Cancelling
state:
from prefect import get_client
from prefect.states import Cancelling
import asyncio
async def cancel_flow_run(flow_run_id: str):
client = get_client()
# First, set the state to Cancelling
await client.set_flow_run_state(
flow_run_id=flow_run_id,
state=Cancelling(message="User requested cancellation")
)
# The flow run will detect this state and handle cleanup
# You can optionally wait for the flow run to complete
while True:
flow_run = await client.read_flow_run(flow_run_id)
if flow_run.state.is_final():
break
await asyncio.sleep(1)
# Usage
if __name__ == "__main__":
asyncio.run(cancel_flow_run("057d7204-af88-42b1-9000-f0e5dc2424d2"))
Bianca Hoch
12/31/2024, 5:37 PMBianca Hoch
12/31/2024, 5:38 PMSam Lawler
01/02/2025, 4:53 AM