Hey Team, (Prefect version: 2.15.0) I need some h...
# ask-community
s
Hey Team, (Prefect version: 2.15.0) I need some help changing a flow runs state from Running to Cancelled. I've tried this using both the CLI and the API endpoint -
set_state
. Both of these methods successfully changes the state in UI however this also results in producing an error during the flow run. Whereas when i cancel via the UI, this cancels the run gracefully, not producing an error - this is what I'm trying to replicate. I've tried: • Moving the state to Cancelling then waiting 20 seconds, then moving to Cancelled • Changing the
force
and
emit_event
API parameters • Replicating what happens when you click on the UI button. From what I can see this is only calling the Cancelled API endpoint, but is providing many more parameters, unsure if these are triggering something further. • Tried using try catch blocks to gracefully handle the errors - these are ignored Here is an example of a flow run which the state was attempted to be changed to Cancelled. As you can see, the state was changed, but produced an error:
Copy code
12:19:55.864 | INFO    | prefect.engine - Created flow run 'hulking-anaconda' for flow 'test-flow-run'
12:19:55.866 | INFO    | Flow run 'hulking-anaconda' - View at <https://app.prefect.cloud/account/365adb4c-4945-4161-b530-3cf9e783fe00/workspace/55844168-8f9f-40df-ba5e-95fc94df7cae/flow-runs/flow-run/ab848a5b-3d03-4ec3-afff-89613f193a3b>
12:19:57.662 | INFO    | Flow run 'TEST_FLOW_RUN' - Created task run 'TASK 1: CANCEL FLOW RUN-0' for task 'TASK 1: CANCEL FLOW RUN'
12:19:57.665 | INFO    | Flow run 'TEST_FLOW_RUN' - Executing 'TASK 1: CANCEL FLOW RUN-0' immediately...
12:19:58.833 | INFO    | Task run 'TASK 1: CANCEL FLOW RUN-0' - Current Flow Id : ab848a5b-3d03-4ec3-afff-89613f193a3b
***** MADE API CALL HERE - REPONSE BELOW ******
{'state': {'id': 'efe9a76c-c378-4549-86f9-6cf320c0762b', 'type': 'CANCELLED', 'name': 'Cancelled', 'timestamp': '2024-12-30T02:19:59.516057Z', 'message': 'Run Cancelled', 'data': None, 'state_details': {'flow_run_id': 'ab848a5b-3d03-4ec3-afff-89613f193a3b', 'task_run_id': None, 'child_flow_run_id': None, 'scheduled_time': None, 'cache_key': None, 'cache_expiration': None, 'deferred': None, 'untrackable_result': False, 'pause_timeout': None, 'pause_reschedule': False, 'pause_key': None, 'run_input_keyset': None, 'refresh_cache': None, 'retriable': None, 'transition_id': None, 'task_parameters_id': None, 'traceparent': None}}, 'status': 'ACCEPT', 'details': {'type': 'accept_details'}}
12:19:59.988 | INFO    | Task run 'TASK 1: CANCEL FLOW RUN-0' - Finished in state Completed()
12:20:00.616 | ERROR   | Flow run 'hulking-anaconda' - Finished in state Cancelled('Run Cancelled')
Traceback (most recent call last):
  File "c:\Users\sam.lawler\Visual Studio\Python Projects\prefect_test\app_flows\test_flow_run\test_flow_run.py", line 111, in <module>
    test_flow_run()
  File "C:\Users\sam.lawler\Visual Studio\Python Projects\prefect_test\venv\Lib\site-packages\prefect\flows.py", line 1128, in __call__
    return enter_flow_run_engine_from_flow_call(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\sam.lawler\Visual Studio\Python Projects\prefect_test\venv\Lib\site-packages\prefect\engine.py", line 298, in enter_flow_run_engine_from_flow_call
    retval = from_sync.wait_for_call_in_loop_thread(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\sam.lawler\Visual Studio\Python Projects\prefect_test\venv\Lib\site-packages\prefect\_internal\concurrency\api.py", line 265, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "C:\Users\sam.lawler\Visual Studio\Python Projects\prefect_test\venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 285, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\sam.lawler\Visual Studio\Python Projects\prefect_test\venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 169, in result      
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.11_3.11.2544.0_x64__qbz5n2kfra8p0\Lib\concurrent\futures\_base.py", line 401, in __get_result
    raise self._exception
  File "C:\Users\sam.lawler\Visual Studio\Python Projects\prefect_test\venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 364, in _run_async  
    result = await coro
             ^^^^^^^^^^
  File "C:\Users\sam.lawler\Visual Studio\Python Projects\prefect_test\venv\Lib\site-packages\prefect\client\utilities.py", line 51, in with_injected_client    
    return await fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\sam.lawler\Visual Studio\Python Projects\prefect_test\venv\Lib\site-packages\prefect\engine.py", line 401, in create_then_begin_flow_run       
    return await state.result(fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\sam.lawler\Visual Studio\Python Projects\prefect_test\venv\Lib\site-packages\prefect\states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
prefect.exceptions.CancelledRun: Run Cancelled
b
Hi Sam, I was able to get the following example to work for me, without having that exception pop up: Here's my long running flow that is being cancelled:
Copy code
from prefect import flow
from prefect import runtime
from prefect.logging.loggers import flow_run_logger
import time

def log_something(flow, flow_run, state):
    flow_run_logger(flow_run, flow).info(f"Flow {flow.name} cancelled")

@flow(on_cancellation=[log_something])
def test_flow():
    print(runtime.flow_run.id)
    time.sleep(60)
    return 42

if __name__=="__main__":
    test_flow.serve(name="my-cancelled-flow")
Here is the script that uses the client to set the flow run to a
Cancelling
state:
Copy code
from prefect import get_client
from prefect.states import Cancelling
import asyncio

async def cancel_flow_run(flow_run_id: str):
    client = get_client()
    
    # First, set the state to Cancelling
    await client.set_flow_run_state(
        flow_run_id=flow_run_id, 
        state=Cancelling(message="User requested cancellation")
    )
    
    # The flow run will detect this state and handle cleanup
    # You can optionally wait for the flow run to complete
    while True:
        flow_run = await client.read_flow_run(flow_run_id)
        if flow_run.state.is_final():
            break
        await asyncio.sleep(1)

# Usage
if __name__ == "__main__":
    asyncio.run(cancel_flow_run("057d7204-af88-42b1-9000-f0e5dc2424d2"))
Here's the flow run in the UI after cancellation:
Admittedly my example uses a secondary script to cancel the flow. Are you specifically looking to do that from within a task that is part of the flow you'd like to cancel?
s
That's awesome thank you, @Bianca Hoch ! I had been basing my code off the documentation, which i was not able to get 100% working: https://docs-2.prefect.io/2.15.0/api-ref/prefect/cli/flow_run/ I've used your code and now all is working as expected. I'm able to cancel the flow run from within the task.
🙌 1
🚀 1