Hi, I noticed if raising an exception in a task an...
# ask-community
t
Hi, I noticed if raising an exception in a task and catch it in flow will still raise the flow exception at the end. Is there way to avoid raise it again? Please see in thread for code example.
Copy code
from prefect import flow, task


class CustomException(Exception):
    pass


@task
def extract():
    raise CustomException


@flow
def my_flow():
    try:
        extract()
    except CustomException:
        print("CustomException was raised")

    print("Flow should finish successfully but not raise CustomException")


if __name__ == "__main__":
    my_flow()
This is the flow output:
Copy code
17:52:07.999 | INFO    | prefect.engine - Created flow run 'greedy-finch' for flow 'my-flow'
17:52:08.147 | INFO    | Flow run 'greedy-finch' - Created task run 'extract-0' for task 'extract'
17:52:08.148 | INFO    | Flow run 'greedy-finch' - Executing 'extract-0' immediately...
17:52:08.197 | ERROR   | Task run 'extract-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/engine.py", line 1551, in orchestrate_task_run
    result = await call.aresult()
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/tonyyun/github/data-operations-v2/fax_ocr/flows/task_exception_test.py", line 10, in extract
    raise CustomException
CustomException
17:52:08.236 | ERROR   | Task run 'extract-0' - Finished in state Failed('Task run encountered an exception: CustomException\n')
CustomException was raised
Flow should finish successfully but not raise CustomException
17:52:08.271 | ERROR   | Flow run 'greedy-finch' - Finished in state Failed('1/1 states failed.')
Traceback (most recent call last):
  File "/Users/tonyyun/github/data-operations-v2/fax_ocr/flows/task_exception_test.py", line 24, in <module>
    my_flow()
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/flows.py", line 468, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/engine.py", line 184, in enter_flow_run_engine_from_flow_call
    retval = from_sync.wait_for_call_in_loop_thread(
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/api.py", line 137, in wait_for_call_in_loop_thread
    return call.result()
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/Cellar/python@3.9/3.9.17/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/usr/local/Cellar/python@3.9/3.9.17/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/client/utilities.py", line 40, in with_injected_client
    return await fn(*args, **kwargs)
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/engine.py", line 258, in create_then_begin_flow_run
    return await state.result(fetch=True)
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/Users/tonyyun/github/data-operations-v2/fax_ocr/flows/task_exception_test.py", line 16, in my_flow
    extract()
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/tasks.py", line 485, in __call__
    return enter_task_run_engine(
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/engine.py", line 972, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/api.py", line 137, in wait_for_call_in_loop_thread
    return call.result()
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/Cellar/python@3.9/3.9.17/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/usr/local/Cellar/python@3.9/3.9.17/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/engine.py", line 1137, in get_task_call_return_value
    return await future._result()
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/futures.py", line 241, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/engine.py", line 1551, in orchestrate_task_run
    result = await call.aresult()
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/tonyyun/github/data-operations-v2/fax_ocr/flows/task_exception_test.py", line 10, in extract
    raise CustomException
__main__.CustomException
r
Does this do what you need?
Copy code
from prefect import flow, task
from prefect.states import Completed


class CustomException(Exception):
    pass


@task()
def extract():
    raise CustomException


@flow
def my_flow():
    try:
        extract()
    except CustomException:
        print("CustomException was raised")

    print("Flow should finish successfully but not raise CustomException")
    return Completed()


if __name__ == "__main__":
    my_flow()
That will tell Prefect to record the flow as completed despite the task failure. And if you need to return something from the flow, you could do
return Completed(data=your_return_value
)
536 Views