https://prefect.io logo
#prefect-community
Title
# prefect-community
j

Jason Thomas

08/17/2022, 12:33 PM
Prefect
2.0.4
Hi all, I just upgraded from
2.0b8
to
2.0.4
. In
v2.0b8
errors within flows/tasks were caught by Prefect and the flow would continue, now in
2.0.4
they are being raised and crashing my run. How can I get back to the previous behavior? I’ll post my code in the thread
1
Code:
Copy code
from prefect import flow, task
from prefect.task_runners import SequentialTaskRunner

@task 
def good_task():
    return True

@task()
def fail_task():
    raise Exception("Error raised for testing")

@flow()
def f1():
    fail_task()
    good_task()


if __name__ == '__main__':
    f1()
Logs in 2.0b8 (desired behavior):
Copy code
06:28:28.735 | INFO    | prefect.engine - Created flow run 'strict-cuttlefish' for flow 'f1'
06:28:28.735 | INFO    | Flow run 'strict-cuttlefish' - Using task runner 'ConcurrentTaskRunner'
06:28:30.433 | WARNING | Flow run 'strict-cuttlefish' - No default storage is configured on the server. Results from this flow run will be stored in a temporary directory in its runtime environment.
06:28:31.470 | INFO    | Flow run 'strict-cuttlefish' - Created task run 'fail_task-d8c73196-0' for task 'fail_task'
06:28:32.491 | INFO    | Flow run 'strict-cuttlefish' - Created task run 'good_task-2a08f303-0' for task 'good_task'
06:28:33.921 | ERROR   | Task run 'fail_task-d8c73196-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/prefect/engine.py", line 890, in orchestrate_task_run
    result = await run_sync_in_interruptible_worker_thread(
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 116, in run_sync_in_interruptible_worker_thread
    tg.start_soon(
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 96, in capture_worker_thread_and_result
    result = __fn(*args, **kwargs)
  File "_test.py", line 12, in fail_task
    raise Exception("Error raised for testing")
Exception: Error raised for testing
06:28:34.943 | ERROR   | Task run 'fail_task-d8c73196-0' - Finished in state Failed('Task run encountered an exception.')
06:28:35.973 | INFO    | Task run 'good_task-2a08f303-0' - Finished in state Completed()
06:28:36.930 | ERROR   | Flow run 'strict-cuttlefish' - Finished in state Failed('1/2 states failed.')
Logs in 2.0.4 (undesired behavior):
Copy code
06:27:00.077 | INFO    | prefect.engine - Created flow run 'witty-hippo' for flow 'f1'
06:27:00.219 | INFO    | Flow run 'witty-hippo' - Created task run 'fail_task-d8c73196-0' for task 'fail_task'
06:27:01.855 | INFO    | Flow run 'witty-hippo' - Executing 'fail_task-d8c73196-0' immediately...
06:27:02.830 | ERROR   | Task run 'fail_task-d8c73196-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/engine.py", line 1053, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "_test.py", line 12, in fail_task
    raise Exception("Error raised for testing")
Exception: Error raised for testing
06:27:03.837 | ERROR   | Task run 'fail_task-d8c73196-0' - Finished in state Failed('Task run encountered an exception.')
06:27:04.852 | ERROR   | Flow run 'witty-hippo' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/engine.py", line 557, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "_test.py", line 16, in f1
    fail_task()
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/tasks.py", line 294, in __call__
    return enter_task_run_engine(
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/engine.py", line 687, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py", line 444, in result
    return self.__get_result()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/engine.py", line 806, in create_task_run_then_submit
    return await future._result()
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/futures.py", line 220, in _result
    return final_state.result(raise_on_failure=raise_on_failure)
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/orion/schemas/states.py", line 145, in result
    raise data
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/engine.py", line 1053, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "_test.py", line 12, in fail_task
    raise Exception("Error raised for testing")
Exception: Error raised for testing
06:27:05.831 | ERROR   | Flow run 'witty-hippo' - Finished in state Failed('Flow run encountered an exception.')
Traceback (most recent call last):
  File "_test.py", line 21, in <module>
    f1()
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/flows.py", line 390, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/engine.py", line 152, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/client.py", line 104, in with_injected_client
    return await fn(*args, **kwargs)
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/engine.py", line 228, in create_then_begin_flow_run
    return state.result()
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/orion/schemas/states.py", line 145, in result
    raise data
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/engine.py", line 557, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "_test.py", line 16, in f1
    fail_task()
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/tasks.py", line 294, in __call__
    return enter_task_run_engine(
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/engine.py", line 687, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py", line 444, in result
    return self.__get_result()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/engine.py", line 806, in create_task_run_then_submit
    return await future._result()
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/futures.py", line 220, in _result
    return final_state.result(raise_on_failure=raise_on_failure)
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/orion/schemas/states.py", line 145, in result
    raise data
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/engine.py", line 1053, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "_test.py", line 12, in fail_task
    raise Exception("Error raised for testing")
Exception: Error raised for testing
s

Serina

08/17/2022, 12:59 PM
I can replicate the
1/2 states failed/finished
state that I believe you are looking for:
Copy code
08:56:39.574 | ERROR   | Task run 'fail_task-d8c73196-0' - Finished in state Failed('Task run encountered an exception.')
08:56:39.587 | INFO    | Task run 'good_task-2a08f303-0' - Finished in state Completed()
08:56:39.612 | ERROR   | Flow run 'brown-mantis' - Finished in state Failed('1/2 states failed.')
by modifying the code like so:
Copy code
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner

@task 
def good_task():
    return True

@task()
def fail_task():
    raise Exception("Error raised for testing")

@flow()
def f1():
    fail_task.submit()
    good_task.submit()

if __name__ == '__main__':
    f1()
ConcurrentTaskRunner is no longer the default like it was in 2.0b8
j

Jason Thomas

08/17/2022, 1:12 PM
Thanks @Serina, that helps a lot. It is now running the second task for me and reaching a finished state. But it is still raising the error after the flow runs. How do I get it to not raise on failure?
s

Serina

08/17/2022, 1:28 PM
Since your flow finishes in a Failed state and since you are creating an Exception in fail_task() upon failure, it will raise the message you wrote within it, where in this case data is “Error raised for testing.” In the UI for the code above I see these logs:
Is that different than what you would expect to see?
j

Jason Thomas

08/17/2022, 1:30 PM
I want the error to be caught and logged, and execution to continue.
s

Serina

08/17/2022, 1:34 PM
Then you might consider using the logger instead of raising
Copy code
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
from prefect.logging import get_run_logger

@task 
def good_task():
    return True

@task()
def fail_task():
    logger = get_run_logger()
    if 2 < 4:
        logger.error(msg="A task failed")

@flow(task_runner=ConcurrentTaskRunner)
def f1():
    fail_task.submit()
    good_task.submit()

if __name__ == '__main__':
    f1()
which would give you the following output:
Copy code
09:32:53.920 | INFO    | prefect.engine - Created flow run 'cerulean-bumblebee' for flow 'f1'
09:32:54.030 | INFO    | Flow run 'cerulean-bumblebee' - Created task run 'fail_task-d8c73196-0' for task 'fail_task'
09:32:54.030 | INFO    | Flow run 'cerulean-bumblebee' - Submitted task run 'fail_task-d8c73196-0' for execution.
09:32:54.054 | INFO    | Flow run 'cerulean-bumblebee' - Created task run 'good_task-2a08f303-0' for task 'good_task'
09:32:54.055 | INFO    | Flow run 'cerulean-bumblebee' - Submitted task run 'good_task-2a08f303-0' for execution.
09:32:54.063 | ERROR   | Task run 'fail_task-d8c73196-0' - A task failed
09:32:54.082 | INFO    | Task run 'fail_task-d8c73196-0' - Finished in state Completed()
09:32:54.088 | INFO    | Task run 'good_task-2a08f303-0' - Finished in state Completed()
09:32:54.103 | INFO    | Flow run 'cerulean-bumblebee' - Finished in state Completed('All states completed.')
With the logs in the UI:
j

Jason Thomas

08/17/2022, 1:36 PM
Here’s a modification to my sample code:
Copy code
from prefect import flow, task
from prefect.task_runners import SequentialTaskRunner

@task 
def good_task():
    return True

@task()
def fail_task():
    raise Exception("Error raised for testing")

@flow()
def f1():
    fail_task()
    good_task()

@flow 
def f2():
    good_task()

if __name__ == '__main__':
    f1()
    f2()
    print('********  all flows complete  ***********')
And the desired output:
Copy code
07:33:50.501 | INFO    | prefect.engine - Created flow run 'purring-vole' for flow 'f1'
07:33:50.501 | INFO    | Flow run 'purring-vole' - Using task runner 'ConcurrentTaskRunner'
07:33:52.031 | WARNING | Flow run 'purring-vole' - No default storage is configured on the server. Results from this flow run will be stored in a temporary directory in its runtime environment.
07:33:53.052 | INFO    | Flow run 'purring-vole' - Created task run 'fail_task-d8c73196-0' for task 'fail_task'
07:33:54.069 | INFO    | Flow run 'purring-vole' - Created task run 'good_task-2a08f303-0' for task 'good_task'
07:33:55.150 | ERROR   | Task run 'fail_task-d8c73196-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/prefect/engine.py", line 890, in orchestrate_task_run
    result = await run_sync_in_interruptible_worker_thread(
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 116, in run_sync_in_interruptible_worker_thread
    tg.start_soon(
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 96, in capture_worker_thread_and_result
    result = __fn(*args, **kwargs)
  File "_test.py", line 10, in fail_task
    raise Exception("Error raised for testing")
Exception: Error raised for testing
07:33:56.134 | ERROR   | Task run 'fail_task-d8c73196-0' - Finished in state Failed('Task run encountered an exception.')
07:33:57.167 | INFO    | Task run 'good_task-2a08f303-0' - Finished in state Completed()
07:33:58.164 | ERROR   | Flow run 'purring-vole' - Finished in state Failed('1/2 states failed.')
07:33:59.380 | INFO    | prefect.engine - Created flow run 'impartial-hummingbird' for flow 'f2'
07:33:59.380 | INFO    | Flow run 'impartial-hummingbird' - Using task runner 'ConcurrentTaskRunner'
07:34:00.355 | WARNING | Flow run 'impartial-hummingbird' - No default storage is configured on the server. Results from this flow run will be stored in a temporary directory in its runtime environment.
07:34:01.420 | INFO    | Flow run 'impartial-hummingbird' - Created task run 'good_task-2a08f303-0' for task 'good_task'
07:34:02.485 | INFO    | Task run 'good_task-2a08f303-0' - Finished in state Completed()
07:34:03.606 | INFO    | Flow run 'impartial-hummingbird' - Finished in state Completed('All states completed.')
********  all flows complete  ***********
It’s not just anticipated errors that I want to catch, it’s any error. I just want my flows to run without crashing.
s

Serina

08/17/2022, 2:18 PM
Hi @Jason Thomas, can you see if this produces the results you’re looking for?
Copy code
from prefect import flow, task
from prefect.task_runners import SequentialTaskRunner

@task 
def good_task():
    return True

@task()
def fail_task():
    raise Exception("Error raised for testing")

@flow()
def f1():
    fail_task()
    good_task()

@flow 
def f2():
    good_task()

if __name__ == '__main__':
    f1(return_state=True)
    f2(return_state=True)
    print('********  all flows complete  ***********')
j

Jason Thomas

08/17/2022, 2:23 PM
That worked on the sample. Testing on something more complex…
Yes, that is working. Thank you @Serina!!
🙌 1