Jason Thomas
08/17/2022, 12:33 PM2.0.4
Hi all, I just upgraded from 2.0b8
to 2.0.4
.
In v2.0b8
errors within flows/tasks were caught by Prefect and the flow would continue, now in 2.0.4
they are being raised and crashing my run. How can I get back to the previous behavior?
I’ll post my code in the threadfrom prefect import flow, task
from prefect.task_runners import SequentialTaskRunner
@task
def good_task():
return True
@task()
def fail_task():
raise Exception("Error raised for testing")
@flow()
def f1():
fail_task()
good_task()
if __name__ == '__main__':
f1()
06:28:28.735 | INFO | prefect.engine - Created flow run 'strict-cuttlefish' for flow 'f1'
06:28:28.735 | INFO | Flow run 'strict-cuttlefish' - Using task runner 'ConcurrentTaskRunner'
06:28:30.433 | WARNING | Flow run 'strict-cuttlefish' - No default storage is configured on the server. Results from this flow run will be stored in a temporary directory in its runtime environment.
06:28:31.470 | INFO | Flow run 'strict-cuttlefish' - Created task run 'fail_task-d8c73196-0' for task 'fail_task'
06:28:32.491 | INFO | Flow run 'strict-cuttlefish' - Created task run 'good_task-2a08f303-0' for task 'good_task'
06:28:33.921 | ERROR | Task run 'fail_task-d8c73196-0' - Encountered exception during execution:
Traceback (most recent call last):
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/prefect/engine.py", line 890, in orchestrate_task_run
result = await run_sync_in_interruptible_worker_thread(
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 116, in run_sync_in_interruptible_worker_thread
tg.start_soon(
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
raise exceptions[0]
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 96, in capture_worker_thread_and_result
result = __fn(*args, **kwargs)
File "_test.py", line 12, in fail_task
raise Exception("Error raised for testing")
Exception: Error raised for testing
06:28:34.943 | ERROR | Task run 'fail_task-d8c73196-0' - Finished in state Failed('Task run encountered an exception.')
06:28:35.973 | INFO | Task run 'good_task-2a08f303-0' - Finished in state Completed()
06:28:36.930 | ERROR | Flow run 'strict-cuttlefish' - Finished in state Failed('1/2 states failed.')
06:27:00.077 | INFO | prefect.engine - Created flow run 'witty-hippo' for flow 'f1'
06:27:00.219 | INFO | Flow run 'witty-hippo' - Created task run 'fail_task-d8c73196-0' for task 'fail_task'
06:27:01.855 | INFO | Flow run 'witty-hippo' - Executing 'fail_task-d8c73196-0' immediately...
06:27:02.830 | ERROR | Task run 'fail_task-d8c73196-0' - Encountered exception during execution:
Traceback (most recent call last):
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/engine.py", line 1053, in orchestrate_task_run
result = await run_sync(task.fn, *args, **kwargs)
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "_test.py", line 12, in fail_task
raise Exception("Error raised for testing")
Exception: Error raised for testing
06:27:03.837 | ERROR | Task run 'fail_task-d8c73196-0' - Finished in state Failed('Task run encountered an exception.')
06:27:04.852 | ERROR | Flow run 'witty-hippo' - Encountered exception during execution:
Traceback (most recent call last):
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/engine.py", line 557, in orchestrate_flow_run
result = await run_sync(flow_call)
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "_test.py", line 16, in f1
fail_task()
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/tasks.py", line 294, in __call__
return enter_task_run_engine(
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/engine.py", line 687, in enter_task_run_engine
return run_async_from_worker_thread(begin_run)
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py", line 444, in result
return self.__get_result()
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/engine.py", line 806, in create_task_run_then_submit
return await future._result()
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/futures.py", line 220, in _result
return final_state.result(raise_on_failure=raise_on_failure)
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/orion/schemas/states.py", line 145, in result
raise data
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/engine.py", line 1053, in orchestrate_task_run
result = await run_sync(task.fn, *args, **kwargs)
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "_test.py", line 12, in fail_task
raise Exception("Error raised for testing")
Exception: Error raised for testing
06:27:05.831 | ERROR | Flow run 'witty-hippo' - Finished in state Failed('Flow run encountered an exception.')
Traceback (most recent call last):
File "_test.py", line 21, in <module>
f1()
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/flows.py", line 390, in __call__
return enter_flow_run_engine_from_flow_call(
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/engine.py", line 152, in enter_flow_run_engine_from_flow_call
return anyio.run(begin_run)
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/client.py", line 104, in with_injected_client
return await fn(*args, **kwargs)
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/engine.py", line 228, in create_then_begin_flow_run
return state.result()
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/orion/schemas/states.py", line 145, in result
raise data
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/engine.py", line 557, in orchestrate_flow_run
result = await run_sync(flow_call)
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "_test.py", line 16, in f1
fail_task()
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/tasks.py", line 294, in __call__
return enter_task_run_engine(
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/engine.py", line 687, in enter_task_run_engine
return run_async_from_worker_thread(begin_run)
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py", line 444, in result
return self.__get_result()
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/engine.py", line 806, in create_task_run_then_submit
return await future._result()
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/futures.py", line 220, in _result
return final_state.result(raise_on_failure=raise_on_failure)
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/orion/schemas/states.py", line 145, in result
raise data
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/engine.py", line 1053, in orchestrate_task_run
result = await run_sync(task.fn, *args, **kwargs)
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_2.0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "_test.py", line 12, in fail_task
raise Exception("Error raised for testing")
Exception: Error raised for testing
Serina
08/17/2022, 12:59 PM1/2 states failed/finished
state that I believe you are looking for:
08:56:39.574 | ERROR | Task run 'fail_task-d8c73196-0' - Finished in state Failed('Task run encountered an exception.')
08:56:39.587 | INFO | Task run 'good_task-2a08f303-0' - Finished in state Completed()
08:56:39.612 | ERROR | Flow run 'brown-mantis' - Finished in state Failed('1/2 states failed.')
by modifying the code like so:
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
@task
def good_task():
return True
@task()
def fail_task():
raise Exception("Error raised for testing")
@flow()
def f1():
fail_task.submit()
good_task.submit()
if __name__ == '__main__':
f1()
ConcurrentTaskRunner is no longer the default like it was in 2.0b8Jason Thomas
08/17/2022, 1:12 PMSerina
08/17/2022, 1:28 PMJason Thomas
08/17/2022, 1:30 PMSerina
08/17/2022, 1:34 PMfrom prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
from prefect.logging import get_run_logger
@task
def good_task():
return True
@task()
def fail_task():
logger = get_run_logger()
if 2 < 4:
logger.error(msg="A task failed")
@flow(task_runner=ConcurrentTaskRunner)
def f1():
fail_task.submit()
good_task.submit()
if __name__ == '__main__':
f1()
which would give you the following output:
09:32:53.920 | INFO | prefect.engine - Created flow run 'cerulean-bumblebee' for flow 'f1'
09:32:54.030 | INFO | Flow run 'cerulean-bumblebee' - Created task run 'fail_task-d8c73196-0' for task 'fail_task'
09:32:54.030 | INFO | Flow run 'cerulean-bumblebee' - Submitted task run 'fail_task-d8c73196-0' for execution.
09:32:54.054 | INFO | Flow run 'cerulean-bumblebee' - Created task run 'good_task-2a08f303-0' for task 'good_task'
09:32:54.055 | INFO | Flow run 'cerulean-bumblebee' - Submitted task run 'good_task-2a08f303-0' for execution.
09:32:54.063 | ERROR | Task run 'fail_task-d8c73196-0' - A task failed
09:32:54.082 | INFO | Task run 'fail_task-d8c73196-0' - Finished in state Completed()
09:32:54.088 | INFO | Task run 'good_task-2a08f303-0' - Finished in state Completed()
09:32:54.103 | INFO | Flow run 'cerulean-bumblebee' - Finished in state Completed('All states completed.')
Jason Thomas
08/17/2022, 1:36 PMfrom prefect import flow, task
from prefect.task_runners import SequentialTaskRunner
@task
def good_task():
return True
@task()
def fail_task():
raise Exception("Error raised for testing")
@flow()
def f1():
fail_task()
good_task()
@flow
def f2():
good_task()
if __name__ == '__main__':
f1()
f2()
print('******** all flows complete ***********')
And the desired output:
07:33:50.501 | INFO | prefect.engine - Created flow run 'purring-vole' for flow 'f1'
07:33:50.501 | INFO | Flow run 'purring-vole' - Using task runner 'ConcurrentTaskRunner'
07:33:52.031 | WARNING | Flow run 'purring-vole' - No default storage is configured on the server. Results from this flow run will be stored in a temporary directory in its runtime environment.
07:33:53.052 | INFO | Flow run 'purring-vole' - Created task run 'fail_task-d8c73196-0' for task 'fail_task'
07:33:54.069 | INFO | Flow run 'purring-vole' - Created task run 'good_task-2a08f303-0' for task 'good_task'
07:33:55.150 | ERROR | Task run 'fail_task-d8c73196-0' - Encountered exception during execution:
Traceback (most recent call last):
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/prefect/engine.py", line 890, in orchestrate_task_run
result = await run_sync_in_interruptible_worker_thread(
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 116, in run_sync_in_interruptible_worker_thread
tg.start_soon(
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
raise exceptions[0]
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 96, in capture_worker_thread_and_result
result = __fn(*args, **kwargs)
File "_test.py", line 10, in fail_task
raise Exception("Error raised for testing")
Exception: Error raised for testing
07:33:56.134 | ERROR | Task run 'fail_task-d8c73196-0' - Finished in state Failed('Task run encountered an exception.')
07:33:57.167 | INFO | Task run 'good_task-2a08f303-0' - Finished in state Completed()
07:33:58.164 | ERROR | Flow run 'purring-vole' - Finished in state Failed('1/2 states failed.')
07:33:59.380 | INFO | prefect.engine - Created flow run 'impartial-hummingbird' for flow 'f2'
07:33:59.380 | INFO | Flow run 'impartial-hummingbird' - Using task runner 'ConcurrentTaskRunner'
07:34:00.355 | WARNING | Flow run 'impartial-hummingbird' - No default storage is configured on the server. Results from this flow run will be stored in a temporary directory in its runtime environment.
07:34:01.420 | INFO | Flow run 'impartial-hummingbird' - Created task run 'good_task-2a08f303-0' for task 'good_task'
07:34:02.485 | INFO | Task run 'good_task-2a08f303-0' - Finished in state Completed()
07:34:03.606 | INFO | Flow run 'impartial-hummingbird' - Finished in state Completed('All states completed.')
******** all flows complete ***********
Serina
08/17/2022, 2:18 PMfrom prefect import flow, task
from prefect.task_runners import SequentialTaskRunner
@task
def good_task():
return True
@task()
def fail_task():
raise Exception("Error raised for testing")
@flow()
def f1():
fail_task()
good_task()
@flow
def f2():
good_task()
if __name__ == '__main__':
f1(return_state=True)
f2(return_state=True)
print('******** all flows complete ***********')
Jason Thomas
08/17/2022, 2:23 PM