Marius Haberstock
06/29/2025, 3:56 PMfrom prefect import flow, task
@task(name="failing_task")
def failing_task():
raise ValueError("This is a simulated failure for testing purposes.")
@flow(name="error_handling_flow")
def error_handling_flow():
failing_task()
if __name__ == "__main__":
# Run the flow and catch any exceptions
error_handling_flow.serve("error_handling_flow-deployment")
When running it, I get these logs:
Completed submission of flow run 'e67fb22f-d72d-4c04-9155-15ee85a6c1ba'
05:50:51 PM
prefect.flow_runs.runner
Downloading flow code from storage at '.'
05:50:52 PM
prefect.flow_runs
Beginning flow run 'pompous-hog' for flow 'error_handling_flow'
05:50:52 PM
prefect.flow_runs
Task run failed with exception: ValueError('This is a simulated failure for testing purposes.') - No retries configured for this task.
Traceback (most recent call last):
File "/home/marius/miniconda3/envs/annotation_loader/lib/python3.11/site-packages/prefect/task_engine.py", line 857, in run_context
yield self
File "/home/marius/miniconda3/envs/annotation_loader/lib/python3.11/site-packages/prefect/task_engine.py", line 1487, in run_task_sync
engine.call_task_fn(txn)
File "/home/marius/miniconda3/envs/annotation_loader/lib/python3.11/site-packages/prefect/task_engine.py", line 874, in call_task_fn
result = call_with_parameters(self.task.fn, parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/marius/miniconda3/envs/annotation_loader/lib/python3.11/site-packages/prefect/utilities/callables.py", line 210, in call_with_parameters
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/home/marius/Documents/repos/annotation_loading/annotation_loader/prefect_exp.py", line 8, in failing_task
raise ValueError("This is a simulated failure for testing purposes.")
ValueError: This is a simulated failure for testing purposes.
05:50:52 PM
failing_task-3c1
prefect.task_runs
Finished in state Failed('Task run encountered an exception ValueError: This is a simulated failure for testing purposes.')
05:50:52 PM
failing_task-3c1
prefect.task_runs
Encountered exception during execution: ValueError('This is a simulated failure for testing purposes.')
Traceback (most recent call last):
File "/home/marius/miniconda3/envs/annotation_loader/lib/python3.11/site-packages/prefect/flow_engine.py", line 765, in run_context
yield self
...
File "/home/marius/miniconda3/envs/annotation_loader/lib/python3.11/site-packages/prefect/utilities/callables.py", line 210, in call_with_parameters
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/home/marius/Documents/repos/annotation_loading/annotation_loader/prefect_exp.py", line 8, in failing_task
raise ValueError("This is a simulated failure for testing purposes.")
ValueError: This is a simulated failure for testing purposes.
05:50:52 PM
prefect.flow_runs
Finished in state Failed('Flow run encountered an exception: ValueError: This is a simulated failure for testing purposes.')
05:50:52 PM
prefect.flow_runs
Engine execution exited with unexpected exception
Traceback (most recent call last):
File "/home/marius/miniconda3/envs/annotation_loader/lib/python3.11/site-packages/prefect/flow_engine.py", line 1530, in run_flow
ret_val = run_flow_sync(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^
File "/home/marius/miniconda3/envs/annotation_loader/lib/python3.11/site-packages/prefect/flow_engine.py", line 1375, in run_flow_sync
return engine.state if return_type == "state" else engine.result()
^^^^^^^^^^^^^^^
File "/home/marius/miniconda3/envs/annotation_loader/lib/python3.11/site-packages/prefect/flow_engine.py", line 351, in result
raise self._raised
File "/home/marius/miniconda3/envs/annotation_loader/lib/python3.11/site-packages/prefect/flow_engine.py", line 765, in run_context
yield self
...
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/home/marius/Documents/repos/annotation_loading/annotation_loader/prefect_exp.py", line 8, in failing_task
raise ValueError("This is a simulated failure for testing purposes.")
ValueError: This is a simulated failure for testing purposes.
05:50:52 PM
prefect.flow_runs
Process for flow run 'pompous-hog' exited with status code: 1
This is a lot of text, for so simple code. Is there a way to reduce the output, to directly see what went wrong and not have all the exceptions duplicated?Marvin
06/29/2025, 3:56 PMMarvin
06/29/2025, 3:56 PMpython
from prefect import flow, task
@task(
name="failing_task",
retries=0, # Explicitly set retries to 0
on_failure=[lambda task, exc: print(f"Task failed with error: {exc}")], # Custom failure handler
)
def failing_task():
raise ValueError("This is a simulated failure for testing purposes.")
@flow(
name="error_handling_flow",
log_prints=True # This ensures our print statements show up in logs
)
def error_handling_flow():
failing_task()
if __name__ == "__main__":
error_handling_flow.serve("error_handling_flow-deployment")
2. Adjust Log Level
You can control the verbosity by setting the log level. Add this at the start of your script:
python
import logging
from prefect.logging import get_run_logger
# Set logging level to only show ERROR and above
logging.getLogger("prefect").setLevel(logging.ERROR)
3. Use Custom Exception Handling
For even more control, you can wrap the task in a try-except block:
python
from prefect import flow, task
@task(name="failing_task")
def failing_task():
raise ValueError("This is a simulated failure for testing purposes.")
@flow(name="error_handling_flow")
def error_handling_flow():
try:
failing_task()
except Exception as e:
logger = get_run_logger()
logger.error(f"Task failed: {str(e)}")
raise # Re-raise if you still want the flow to fail
if __name__ == "__main__":
error_handling_flow.serve("error_handling_flow-deployment")
The key benefits of these approaches:
- More focused error messages
- Control over what information gets logged
- Cleaner output in the UI and logs
- Ability to add custom error handling logic
Remember that while reducing log verbosity can make output cleaner, you might want to keep detailed logs in production environments for debugging purposes. The duplicated stack traces you're seeing are actually coming from different layers of Prefect's execution model (task level, flow level, and engine level), which can be useful for debugging complex issues.Marius Haberstock
06/29/2025, 4:00 PMMarvin
06/29/2025, 4:01 PMMarvin
06/29/2025, 4:01 PMpython
from prefect import flow, task
@task(name="failing_task")
def failing_task():
raise ValueError("This is a simulated failure for testing purposes.")
@flow(name="error_handling_flow", log_prints=True)
def error_handling_flow():
# Using suppress_task_log_prints=True will prevent duplicate error logs from the task level
with get_run_context() as context:
context.task_run.suppress_task_log_prints = True
failing_task()
if __name__ == "__main__":
error_handling_flow.serve("error_handling_flow-deployment")
You're absolutely right - we want to:
1. Keep the flow's failed state (important for monitoring and orchestration)
2. See the actual error
3. Avoid the duplicated stack traces
The suppress_task_log_prints=True
setting tells Prefect to suppress the duplicate task-level logs while still maintaining the flow-level error reporting. This way, you'll see the error clearly, the flow will still fail appropriately, but you won't get the same error logged multiple times.
If you want even more control over the error output while still maintaining the failed state, you can use:
python
from prefect import flow, task
from prefect.context import get_run_context
@task(name="failing_task")
def failing_task():
raise ValueError("This is a simulated failure for testing purposes.")
@flow(name="error_handling_flow", log_prints=True)
def error_handling_flow():
with get_run_context() as context:
context.task_run.suppress_task_log_prints = True
try:
failing_task()
except Exception as e:
# This will show just the error without the stack trace
print(f"Flow failed: {type(e).__name__}: {str(e)}")
raise # Re-raise to maintain the failed state
This will give you much cleaner logs while still properly indicating failure in Prefect's state tracking system.Marius Haberstock
06/29/2025, 4:16 PMRunner 'error_handling_flow-deployment' submitting flow run 'ea3fdbab-71d1-4863-9a19-193a5340f95f'
06:10:49 PM
prefect.flow_runs.runner
Opening process...
06:10:49 PM
prefect.flow_runs.runner
Completed submission of flow run 'ea3fdbab-71d1-4863-9a19-193a5340f95f'
06:10:49 PM
prefect.flow_runs.runner
Downloading flow code from storage at '.'
06:10:51 PM
prefect.flow_runs
Beginning flow run 'nifty-labradoodle' for flow 'error_handling_flow'
06:14:36 PM
prefect.flow_runs
Encountered exception during execution: RuntimeError('Context already entered. Context enter calls cannot be nested.')
Traceback (most recent call last):
File "/home/marius/miniconda3/envs/annotation_loader/lib/python3.11/site-packages/prefect/flow_engine.py", line 765, in run_context
yield self
File "/home/marius/miniconda3/envs/annotation_loader/lib/python3.11/site-packages/prefect/flow_engine.py", line 1373, in run_flow_sync
engine.call_flow_fn()
File "/home/marius/miniconda3/envs/annotation_loader/lib/python3.11/site-packages/prefect/flow_engine.py", line 785, in call_flow_fn
result = call_with_parameters(self.flow.fn, self.parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/marius/miniconda3/envs/annotation_loader/lib/python3.11/site-packages/prefect/utilities/callables.py", line 210, in call_with_parameters
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/home/marius/Documents/repos/annotation_loading/annotation_loader/prefect_exp.py", line 11, in error_handling_flow
with get_run_context() as context:
File "/home/marius/miniconda3/envs/annotation_loader/lib/python3.11/site-packages/prefect/context.py", line 164, in __enter__
raise RuntimeError(
RuntimeError: Context already entered. Context enter calls cannot be nested.
06:14:54 PM
prefect.flow_runs
Finished in state Failed('Flow run encountered an exception: RuntimeError: Context already entered. Context enter calls cannot be nested.')
06:14:54 PM
prefect.flow_runs
Engine execution exited with unexpected exception
Traceback (most recent call last):
File "/home/marius/miniconda3/envs/annotation_loader/lib/python3.11/site-packages/prefect/flow_engine.py", line 1530, in run_flow
ret_val = run_flow_sync(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^
File "/home/marius/miniconda3/envs/annotation_loader/lib/python3.11/site-packages/prefect/flow_engine.py", line 1375, in run_flow_sync
return engine.state if return_type == "state" else engine.result()
^^^^^^^^^^^^^^^
File "/home/marius/miniconda3/envs/annotation_loader/lib/python3.11/site-packages/prefect/flow_engine.py", line 351, in result
raise self._raised
File "/home/marius/miniconda3/envs/annotation_loader/lib/python3.11/site-packages/prefect/flow_engine.py", line 765, in run_context
yield self
File "/home/marius/miniconda3/envs/annotation_loader/lib/python3.11/site-packages/prefect/flow_engine.py", line 1373, in run_flow_sync
engine.call_flow_fn()
File "/home/marius/miniconda3/envs/annotation_loader/lib/python3.11/site-packages/prefect/flow_engine.py", line 785, in call_flow_fn
result = call_with_parameters(self.flow.fn, self.parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/marius/miniconda3/envs/annotation_loader/lib/python3.11/site-packages/prefect/utilities/callables.py", line 210, in call_with_parameters
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/home/marius/Documents/repos/annotation_loading/annotation_loader/prefect_exp.py", line 11, in error_handling_flow
with get_run_context() as context:
File "/home/marius/miniconda3/envs/annotation_loader/lib/python3.11/site-packages/prefect/context.py", line 164, in __enter__
raise RuntimeError(
RuntimeError: Context already entered. Context enter calls cannot be nested.
06:14:54 PM
prefect.flow_runs
Process for flow run 'nifty-labradoodle' exited with status code: 1
Marvin
06/29/2025, 4:16 PMMarvin
06/29/2025, 4:16 PMpython
from prefect import flow, task
@task(name="failing_task", suppress_task_log_prints=True)
def failing_task():
raise ValueError("This is a simulated failure for testing purposes.")
@flow(name="error_handling_flow")
def error_handling_flow():
failing_task()
if __name__ == "__main__":
error_handling_flow.serve("error_handling_flow-deployment")
This moves the log suppression to the task decorator where it belongs, avoiding the context management issue entirely. The flow will still fail appropriately, but with cleaner logs.
If you want even more control over the error output while maintaining the failed state, you could use:
python
from prefect import flow, task
from prefect.logging import get_run_logger
@task(name="failing_task", suppress_task_log_prints=True)
def failing_task():
raise ValueError("This is a simulated failure for testing purposes.")
@flow(name="error_handling_flow")
def error_handling_flow():
logger = get_run_logger()
try:
failing_task()
except Exception as e:
logger.error(f"Flow failed: {type(e).__name__}: {str(e)}")
raise
if __name__ == "__main__":
error_handling_flow.serve("error_handling_flow-deployment")
Both approaches will give you cleaner logs while properly maintaining the flow's failed state. The second version gives you more control over the exact error message format.Marius Haberstock
06/29/2025, 4:18 PMMarvin
06/29/2025, 4:18 PMMarvin
06/29/2025, 4:18 PMpython
from prefect import flow, task
import logging
from prefect.logging import get_run_logger
# Configure logging to be less verbose
logging.getLogger("prefect.task_runs").propagate = False
@task(name="failing_task")
def failing_task():
raise ValueError("This is a simulated failure for testing purposes.")
@flow(name="error_handling_flow")
def error_handling_flow():
logger = get_run_logger()
try:
failing_task()
except Exception as e:
logger.error(f"Task failed: {str(e)}")
raise # Re-raise to maintain the failed state
if __name__ == "__main__":
error_handling_flow.serve("error_handling_flow-deployment")
This approach:
1. Prevents task-level logs from propagating up, reducing duplicates
2. Maintains the flow's failed state
3. Provides clear error messages
4. Uses the proper logging configuration for Prefect 3.x
For even more control, you can create a logging.yml
file in your project directory:
yaml
version: 1
disable_existing_loggers: false
formatters:
simple:
format: "%(asctime)s | %(message)s"
datefmt: "%H:%M:%S"
handlers:
console:
class: logging.StreamHandler
formatter: simple
stream: <ext://sys.stdout>
loggers:
prefect.task_runs:
level: ERROR
handlers: [console]
propagate: false
root:
level: INFO
handlers: [console]
Then set the environment variable PREFECT_LOGGING_CONFIGURATION_PATH
to point to this file before running your script.
This will give you cleaner, more focused error output while still maintaining proper flow state management.Marius Haberstock
06/29/2025, 4:21 PMMarvin
06/29/2025, 4:21 PMMarvin
06/29/2025, 4:21 PMpython
from prefect import flow, task
from prefect.logging import get_run_logger
@task(name="failing_task", retries=0)
def failing_task():
raise ValueError("This is a simulated failure for testing purposes.")
@flow(name="error_handling_flow", log_prints=True)
def error_handling_flow():
try:
failing_task()
except Exception as e:
print(f"Flow failed: {e}") # This will show in logs due to log_prints=True
raise e from None # This suppresses the traceback chaining
if __name__ == "__main__":
error_handling_flow.serve("error_handling_flow-deployment")
The key changes here are:
1. Using raise e from None
to prevent exception chaining
2. Setting retries=0
explicitly on the task to avoid retry-related logs
3. Using a simple print statement with log_prints=True
to ensure we see the error clearly
If you still see too much output, we can try one more approach that's even more direct:
python
from prefect import flow, task
from prefect.logging import get_run_logger
import sys
# Suppress traceback globally
sys.tracebacklimit = 0
@task(name="failing_task")
def failing_task():
raise ValueError("This is a simulated failure for testing purposes.")
@flow(name="error_handling_flow")
def error_handling_flow():
failing_task()
if __name__ == "__main__":
error_handling_flow.serve("error_handling_flow-deployment")
This second version uses Python's built-in traceback limiting to suppress the stack traces entirely while still maintaining the flow's failed state. Let me know if either of these approaches gives you the cleaner output you're looking for.Marius Haberstock
06/29/2025, 4:24 PMMarvin
06/29/2025, 4:24 PMMarvin
06/29/2025, 4:25 PMprefect.yaml
in your project directory:
yaml
logging:
version: 1
handlers:
console:
class: logging.StreamHandler
formatter: default
stream: <ext://sys.stdout>
formatters:
default:
format: "%(asctime)s | %(message)s"
datefmt: "%H:%M:%S"
loggers:
prefect:
level: ERROR
handlers: [console]
propagate: false
prefect.flow_runs:
level: ERROR
handlers: [console]
propagate: false
prefect.task_runs:
level: ERROR
handlers: [console]
propagate: false
root:
handlers: [console]
level: ERROR
Then set this environment variable before running your script:
bash
export PREFECT_LOGGING_CONFIGURATION_PATH=./prefect.yaml
Your Python code can remain simple:
python
from prefect import flow, task
@task(name="failing_task")
def failing_task():
raise ValueError("This is a simulated failure for testing purposes.")
@flow(name="error_handling_flow")
def error_handling_flow():
failing_task()
if __name__ == "__main__":
error_handling_flow.serve("error_handling_flow-deployment")
This configuration will:
1. Show the error traceback only once
2. Maintain the full traceback so you can see the source of the error
3. Keep the flow's failed state
4. Reduce duplicate logging across different Prefect components
The key is that we're configuring the logging system to handle errors at the appropriate level while preventing propagation that causes duplicate messages.