Ash
08/23/2023, 10:47 AMfrom prefect.server.schemas.states import Completed, Failed, Cancelled
with
return Cancelled(message="No records retreived.")
in my code somewhere. This still shows as Failed though, with the cancelled being part of the failed message?
10:43:06.764 | ERROR | Flow run 'astonishing-hawk' - Finished in state Failed('Flow run encountered an exception. CancelledRun: No records retreived.')
Any idea if I'm missing something or if there's a better way?10:43:06.764 | ERROR | Flow run 'astonishing-hawk' - Finished in state Failed('Flow run encountered an exception. CancelledRun: No records retreived.')
Traceback (most recent call last):
File "/root/flows/projects/inmotion_questions/inmotion_questions_etl.py", line 206, in <module>
etl_questions_data()
File "/usr/local/lib/python3.11/site-packages/prefect/flows.py", line 511, in __call__
return enter_flow_run_engine_from_flow_call(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 272, in enter_flow_run_engine_from_flow_call
retval = from_sync.wait_for_call_in_loop_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
return call.result()
^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 283, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 169, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
result = await coro
^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
return await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 375, in create_then_begin_flow_run
return await state.result(fetch=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 833, in orchestrate_flow_run
result = await flow_call.aresult()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
return await asyncio.wrap_future(self.future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 316, in _run_sync
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/root/flows/projects/inmotion_questions/inmotion_questions_etl.py", line 24, in etl_questions_data
questions_data = extract_questions(url, maximum_question_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/tasks.py", line 505, in __call__
return enter_task_run_engine(
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1137, in enter_task_run_engine
return from_sync.wait_for_call_in_loop_thread(begin_run)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
return call.result()
^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 283, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 169, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
result = await coro
^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1302, in get_task_call_return_value
return await future._result()
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/futures.py", line 237, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
prefect.exceptions.CancelledRun: No records retreived.
Jake Kaplan
08/23/2023, 1:49 PMfrom prefect import flow
from prefect.states import Cancelled
@flow
def my_flow():
return Cancelled(message="The flow is cancelled")
if __name__ == '__main__':
my_flow()
gives
09:47:13.652 | ERROR | Flow run 'hypersonic-honeybee' - Finished in state Cancelled('The flow is cancelled')
...
prefect.exceptions.CancelledRun: The flow is cancelled
Ash
08/23/2023, 1:50 PMJake Kaplan
08/23/2023, 2:00 PMfrom prefect import flow, task
from prefect.states import Cancelled
@task
def my_task():
return Cancelled(message="The task is cancelled")
@flow
def my_flow():
state = my_task(return_state=True)
if state.is_cancelled():
return Cancelled(message="The flow is cancelled")
if __name__ == '__main__':
my_flow()
Ash
08/23/2023, 2:01 PMJake Kaplan
08/23/2023, 2:04 PMCancelled
specifically? Normally prefect treats things like that as failed
@task
def my_task():
raise Exception("didn't work")
@flow
def my_flow():
my_task()
Ash
08/23/2023, 2:05 PMJake Kaplan
08/23/2023, 2:08 PMAsh
08/23/2023, 2:09 PM