Tony Yun
09/20/2023, 9:52 PMTony Yun
09/20/2023, 9:53 PMfrom prefect import flow, task
class CustomException(Exception):
pass
@task
def extract():
raise CustomException
@flow
def my_flow():
try:
extract()
except CustomException:
print("CustomException was raised")
print("Flow should finish successfully but not raise CustomException")
if __name__ == "__main__":
my_flow()
This is the flow output:
17:52:07.999 | INFO | prefect.engine - Created flow run 'greedy-finch' for flow 'my-flow'
17:52:08.147 | INFO | Flow run 'greedy-finch' - Created task run 'extract-0' for task 'extract'
17:52:08.148 | INFO | Flow run 'greedy-finch' - Executing 'extract-0' immediately...
17:52:08.197 | ERROR | Task run 'extract-0' - Encountered exception during execution:
Traceback (most recent call last):
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/engine.py", line 1551, in orchestrate_task_run
result = await call.aresult()
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
return await asyncio.wrap_future(self.future)
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
result = self.fn(*self.args, **self.kwargs)
File "/Users/tonyyun/github/data-operations-v2/fax_ocr/flows/task_exception_test.py", line 10, in extract
raise CustomException
CustomException
17:52:08.236 | ERROR | Task run 'extract-0' - Finished in state Failed('Task run encountered an exception: CustomException\n')
CustomException was raised
Flow should finish successfully but not raise CustomException
17:52:08.271 | ERROR | Flow run 'greedy-finch' - Finished in state Failed('1/1 states failed.')
Traceback (most recent call last):
File "/Users/tonyyun/github/data-operations-v2/fax_ocr/flows/task_exception_test.py", line 24, in <module>
my_flow()
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/flows.py", line 468, in __call__
return enter_flow_run_engine_from_flow_call(
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/engine.py", line 184, in enter_flow_run_engine_from_flow_call
retval = from_sync.wait_for_call_in_loop_thread(
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/api.py", line 137, in wait_for_call_in_loop_thread
return call.result()
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
return self.future.result(timeout=timeout)
File "/usr/local/Cellar/python@3.9/3.9.17/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 439, in result
return self.__get_result()
File "/usr/local/Cellar/python@3.9/3.9.17/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/client/utilities.py", line 40, in with_injected_client
return await fn(*args, **kwargs)
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/engine.py", line 258, in create_then_begin_flow_run
return await state.result(fetch=True)
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "/Users/tonyyun/github/data-operations-v2/fax_ocr/flows/task_exception_test.py", line 16, in my_flow
extract()
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/tasks.py", line 485, in __call__
return enter_task_run_engine(
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/engine.py", line 972, in enter_task_run_engine
return from_sync.wait_for_call_in_loop_thread(begin_run)
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/api.py", line 137, in wait_for_call_in_loop_thread
return call.result()
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
return self.future.result(timeout=timeout)
File "/usr/local/Cellar/python@3.9/3.9.17/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 439, in result
return self.__get_result()
File "/usr/local/Cellar/python@3.9/3.9.17/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/engine.py", line 1137, in get_task_call_return_value
return await future._result()
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/futures.py", line 241, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/engine.py", line 1551, in orchestrate_task_run
result = await call.aresult()
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
return await asyncio.wrap_future(self.future)
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
result = self.fn(*self.args, **self.kwargs)
File "/Users/tonyyun/github/data-operations-v2/fax_ocr/flows/task_exception_test.py", line 10, in extract
raise CustomException
__main__.CustomException
Ryan Peden
09/20/2023, 10:28 PMfrom prefect import flow, task
from prefect.states import Completed
class CustomException(Exception):
pass
@task()
def extract():
raise CustomException
@flow
def my_flow():
try:
extract()
except CustomException:
print("CustomException was raised")
print("Flow should finish successfully but not raise CustomException")
return Completed()
if __name__ == "__main__":
my_flow()
That will tell Prefect to record the flow as completed despite the task failure. And if you need to return something from the flow, you could do return Completed(data=your_return_value
)