https://prefect.io logo
a

Ash

08/23/2023, 10:47 AM
Hi, I'm trying to get a flow to show as cancelled in the UI so have used
from prefect.server.schemas.states import Completed, Failed, Cancelled
with
return Cancelled(message="No records retreived.")
in my code somewhere. This still shows as Failed though, with the cancelled being part of the failed message?
10:43:06.764 | ERROR   | Flow run 'astonishing-hawk' - Finished in state Failed('Flow run encountered an exception. CancelledRun: No records retreived.')
Any idea if I'm missing something or if there's a better way?
1
Full message:
10:43:06.764 | ERROR   | Flow run 'astonishing-hawk' - Finished in state Failed('Flow run encountered an exception. CancelledRun: No records retreived.')
Traceback (most recent call last):
File "/root/flows/projects/inmotion_questions/inmotion_questions_etl.py", line 206, in <module>
etl_questions_data()
File "/usr/local/lib/python3.11/site-packages/prefect/flows.py", line 511, in __call__
return enter_flow_run_engine_from_flow_call(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 272, in enter_flow_run_engine_from_flow_call
retval = from_sync.wait_for_call_in_loop_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
return call.result()
^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 283, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 169, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
result = await coro
^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
return await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 375, in create_then_begin_flow_run
return await state.result(fetch=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 833, in orchestrate_flow_run
result = await flow_call.aresult()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
return await asyncio.wrap_future(self.future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 316, in _run_sync
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/root/flows/projects/inmotion_questions/inmotion_questions_etl.py", line 24, in etl_questions_data
questions_data = extract_questions(url, maximum_question_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/tasks.py", line 505, in __call__
return enter_task_run_engine(
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1137, in enter_task_run_engine
return from_sync.wait_for_call_in_loop_thread(begin_run)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
return call.result()
^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 283, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 169, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
result = await coro
^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1302, in get_task_call_return_value
return await future._result()
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/futures.py", line 237, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
prefect.exceptions.CancelledRun: No records retreived.
j

Jake Kaplan

08/23/2023, 1:49 PM
You should be able to do this:
Copy code
from prefect import flow
from prefect.states import Cancelled

@flow
def my_flow():
    return Cancelled(message="The flow is cancelled")


if __name__ == '__main__':
    my_flow()
gives
Copy code
09:47:13.652 | ERROR   | Flow run 'hypersonic-honeybee' - Finished in state Cancelled('The flow is cancelled')
...
prefect.exceptions.CancelledRun: The flow is cancelled
are you running your flow differently or with an option maybe?
a

Ash

08/23/2023, 1:50 PM
Ah so the cancelled part has to go in the flow itself, mine was in a task
Is there a way to trigger a cancel from a task, or is it a case of having to pass back something to the flow to tell it to cancel it there?
j

Jake Kaplan

08/23/2023, 2:00 PM
You could do something like this:
Copy code
from prefect import flow, task
from prefect.states import Cancelled


@task
def my_task():
    return Cancelled(message="The task is cancelled")


@flow
def my_flow():
    state = my_task(return_state=True)
    if state.is_cancelled():
        return Cancelled(message="The flow is cancelled")


if __name__ == '__main__':
    my_flow()
traditionally cancelled means something a little different, where the flow run is cancelled first (not the task)
a

Ash

08/23/2023, 2:01 PM
Thanks @Jake Kaplan, I will look into this. I was attempting to cancel the rest of the flow (and tasks) if one of the prelim tasks fails
j

Jake Kaplan

08/23/2023, 2:04 PM
np! Do you need it to be
Cancelled
specifically? Normally prefect treats things like that as failed
Copy code
@task
def my_task():
    raise Exception("didn't work")

@flow
def my_flow():
    my_task()
a

Ash

08/23/2023, 2:05 PM
It's more for the Prefect UI to show that something tried to run but didn't because there was nothing new to process, so Cancelled makes it a bit clearer when looking in the history. Failed makes it seem like something went wrong 🙂
j

Jake Kaplan

08/23/2023, 2:08 PM
Ah I see, makes sense. You might find this useful potentially: https://github.com/PrefectHQ/prefect/issues/10291#issuecomment-1682350743 best of luck!
🙌 2
a

Ash

08/23/2023, 2:09 PM
Oooo that looks even better, thank you!