<@ULVA73B9P> I am using prefect 3.4.9. My flows an...
# ask-marvin
u
@Marvin I am using prefect 3.4.9. My flows and tasks are using a logging class based on loguru. I have made sure that logs are being written to stdout and added loguru and logs class to
"PREFECT_LOGGING_EXTRA_LOGGERS": "loguru,MyLogW"
but still not seeing any logs into the UI
m
thought for 40.0 seconds
Based on your description and my research, I can help troubleshoot your logging setup. Here are a few key points and potential solutions: 1. Configuration Verification: - Your approach of adding loggers to
PREFECT_LOGGING_EXTRA_LOGGERS
is correct, but there might be some additional configuration needed. - Make sure there's no space after the comma in the configuration:
"PREFECT_LOGGING_EXTRA_LOGGERS": "loguru,MyLogW"
2. Common Issues and Solutions: a. Logger Level: - Ensure your custom logger's level matches or is lower than Prefect's logging level - The default Prefect logging level is INFO - You can check/set Prefect's logging level via the
PREFECT_LOGGING_LEVEL
setting b. Handler Configuration:
Copy code
python
   from loguru import logger
   import sys
   
   # Make sure loguru is configured to output to stdout
   logger.configure(handlers=[{"sink": sys.stdout, "level": "INFO"}])
c. Integration Point: - Ensure your custom logging class is properly intercepting and forwarding logs to loguru - The logger should be initialized before your flow/task execution 3. Debugging Steps: - Add a simple test log in your flow to verify basic logging:
Copy code
python
   from prefect import flow
   from loguru import logger
   
   @flow
   def test_logging():
       <http://logger.info|logger.info>("Test log message")
       return "Done"
- Run this flow and check both the console output and UI - If you see logs in stdout but not in the UI, it might be an integration issue 4. Alternative Approach: If the above doesn't work, you can try using Prefect's built-in logger alongside loguru:
Copy code
python
   from prefect import get_run_logger
   
   @flow
   def my_flow():
       logger = get_run_logger()
       <http://logger.info|logger.info>("This will definitely show in the UI")
Could you share: 1. A minimal example of how you're setting up your custom logging class? 2. Are you seeing the logs in stdout when running the flow locally? 3. Are you using any custom logging configuration in your Prefect deployment settings? This would help me provide more specific guidance for your setup.
u
@Marvin 1. here is how the logger is configured to write to stdout
Copy code
logger.add(sys.stdout, format=LOG_FORMAT, level=self._level)
2. I am seeing logs in stdout but not in prefect UI 3. No, there is no custom logging setting except
Copy code
"PREFECT_LOGGING_LEVEL": "debug",
    "PREFECT_LOGGING_EXTRA_LOGGERS": "loguru,MyLogW"
m
thought for 67.1 seconds
Based on the research and your configuration, I can see a few potential issues and solutions: 1. Logger Configuration Issue The current setup might not be properly propagating logs to Prefect's logging system. Here's a modified approach that should work better:
Copy code
python
import sys
from loguru import logger
from prefect import flow

# Remove any existing handlers
logger.remove()

# Add stdout handler with your format
LOG_FORMAT = "<your format string>"  # Your existing format
logger.add(
    sys.stdout,
    format=LOG_FORMAT,
    level="DEBUG",
    enqueue=True,  # Important for thread safety
    backtrace=True,
    diagnose=True,
)

# Add a handler that will work with Python's logging system
# This is crucial for Prefect integration
import logging

class InterceptHandler(logging.Handler):
    def emit(self, record):
        # Get corresponding Loguru level if it exists
        try:
            level = logger.level(record.levelname).name
        except ValueError:
            level = record.levelno

        # Find caller from where originated the logged message
        frame, depth = logging.currentframe(), 2
        while frame.f_code.co_filename == logging.__file__:
            frame = frame.f_back
            depth += 1

        logger.opt(depth=depth, exception=record.exc_info).log(
            level, record.getMessage()
        )

# Configure Python logging to use our interceptor
logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)

# Your flow code
@flow
def your_flow():
    logger.debug("Debug message")
    <http://logger.info|logger.info>("Info message")
    # Your flow logic here
2. Environment Configuration Your environment variables look correct, but let's make sure they're properly set and formatted:
Copy code
bash
export PREFECT_LOGGING_LEVEL="DEBUG"
export PREFECT_LOGGING_EXTRA_LOGGERS="loguru,MyLogW"
3. Verification Test Try this simple test flow to verify the setup:
Copy code
python
from prefect import flow, task
from loguru import logger

@task
def test_task():
    logger.debug("Debug from task")
    <http://logger.info|logger.info>("Info from task")
    logger.warning("Warning from task")
    return "Done"

@flow
def test_flow():
    logger.debug("Debug from flow")
    <http://logger.info|logger.info>("Info from flow")
    test_task()
    return "Flow complete"

if __name__ == "__main__":
    test_flow()
4. Additional Troubleshooting Steps If you're still not seeing logs in the UI after implementing these changes: a. Verify the log levels are being respected:
Copy code
python
from prefect.logging import get_run_logger

@flow
def check_logging():
    prefect_logger = get_run_logger()
    <http://logger.info|logger.info>("Loguru INFO")
    <http://prefect_logger.info|prefect_logger.info>("Prefect INFO")
    logger.debug("Loguru DEBUG")
    prefect_logger.debug("Prefect DEBUG")
b. Check if your custom
MyLogW
class is properly configured to work with both loguru and Python's logging system. Could you try implementing these changes and let me know if you're still experiencing issues? Also, it would be helpful to know: 1. Are you running this in a deployment or locally? 2. Could you share a bit more about your
MyLogW
class implementation? 3. Are you seeing any error messages in the console output?
u
@Marvin I tried using the defaul logging python to write logs to stdout
Copy code
import logging

logging.basicConfig(stream=sys.stdout)
@task
def test_my_log():
    name = "Heya"
    <http://logging.info|logging.info>("This is a debug message from test_my_log")
    time.sleep(5)
even this is not being written to UI
m
thought for 113.2 seconds
Could you try the modified code I provided and let me know if you're still experiencing issues? Also, since you're using Prefect Cloud, make sure your API key and workspace settings are correctly configured (which appears to be the case from the config output).
u
@Marvin, no I am not using prefect cloud, rather I am running prefect server and worker in kubernetes
m
thought for 81.7 seconds
Based on your Kubernetes setup and the research, here's what we need to do to get the logs showing up in the UI: 1. First, verify your Prefect server configuration Make sure your local environment is pointing to your Kubernetes-hosted Prefect server:
Copy code
bash
export PREFECT_API_URL="<http://your-prefect-server:4200/api>"
2. Update the logging configuration in your flow code:
Copy code
python
from prefect import task, flow
from prefect.logging import get_run_logger
import time

@task
def test_my_log():
    logger = get_run_logger()  # This is crucial for K8s deployments
    
    <http://logger.info|logger.info>("This is a test message from test_my_log")
    time.sleep(5)
    return "Done"

@flow(log_prints=True)  # Enable logging of print statements too
def test_flow():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Starting flow")
    result = test_my_log()
    <http://logger.info|logger.info>("Flow completed")
    return result
3. Ensure proper environment variables in your Kubernetes deployment: Add these to your deployment configuration (either in your deployment YAML or helm values):
Copy code
yaml
env:
  - name: PREFECT_LOGGING_LEVEL
    value: "DEBUG"
  - name: PREFECT_LOGGING_SERVER_LEVEL
    value: "DEBUG"
  - name: PREFECT_LOGGING_EXTRA_LOGGERS
    value: "loguru,MyLogW"
4. Verify your worker configuration: Make sure your worker deployment has the necessary permissions to send logs back to the Prefect server. 5. Test the setup: Deploy your flow and run it:
Copy code
python
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import IntervalSchedule

deployment = Deployment.build_from_flow(
    flow=test_flow,
    name="k8s-logging-test",
    work_queue_name="your-work-queue"
)
deployment.apply()
To help diagnose the issue further, could you please provide: 1. How are you deploying your flows to Kubernetes? (Using the Kubernetes infrastructure block, or another method?) 2. Can you share the relevant parts of your Kubernetes deployment configuration? 3. Are you seeing any errors in the Kubernetes pod logs? You can check with:
Copy code
bash
   kubectl logs <pod-name> -n <your-namespace>
Also, try this quick test to verify basic connectivity:
Copy code
python
from prefect import flow

@flow(log_prints=True)
def test_k8s_logging():
    print("This should show in the UI")
    return "Done"

if __name__ == "__main__":
    test_k8s_logging()
If you're still not seeing logs, we might need to: 1. Check the Prefect server logs for any errors 2. Verify the worker is properly configured to send logs 3. Ensure there are no network issues between your worker pods and the Prefect server Let me know the results of these tests and I can help further diagnose the issue.
u
@Marvin I am sure all other configs are okay because when I am using
get_run_logger
, I can see logs in UI
m
thought for 18.1 seconds
Ah, now I understand the core issue! Since
get_run_logger()
works but standard Python logging doesn't, this is specifically about how your custom logging needs to integrate with Prefect's logging system. Here's the key insight: Prefect's UI can only display logs that go through its logging system. When you use
get_run_logger()
, you're properly routing through Prefect's system, but your custom logger isn't connected to this pipeline. Here's how to fix this: 1. Modify your custom logging setup to ensure it propagates to Prefect's logging system:
Copy code
python
import sys
import logging
from prefect import task, flow
from prefect.logging import get_run_logger

# Configure your custom logger to propagate to parent loggers
logging.getLogger('MyLogW').propagate = True
logging.getLogger('loguru').propagate = True

# If you're using loguru, use this interceptor
class PrefectCompatibleHandler(logging.Handler):
    def emit(self, record):
        # Get the Prefect logger
        prefect_logger = get_run_logger()
        
        # Forward the log message to Prefect's logger
        prefect_logger.log(record.levelno, record.getMessage())

# Add the handler to your custom logger
custom_logger = logging.getLogger('MyLogW')
custom_logger.addHandler(PrefectCompatibleHandler())

@task
def test_my_log():
    # Your existing logging code should now work
    <http://custom_logger.info|custom_logger.info>("This should appear in the UI")
    
    # This will also work as before
    task_logger = get_run_logger()
    <http://task_logger.info|task_logger.info>("This already worked")
    return "Done"

@flow
def test_flow():
    return test_my_log()
2. If you're using loguru, here's the specific setup needed:
Copy code
python
import sys
from loguru import logger
from prefect import task, flow
from prefect.logging import get_run_logger

# Remove default logger
logger.remove()

# Add stdout handler
logger.add(sys.stdout, format="{time} {level} {message}")

# Add Prefect-compatible handler
def prefect_sink(message):
    prefect_logger = get_run_logger()
    prefect_logger.log(message.record["level"].no, message.record["message"])

logger.add(prefect_sink, format="{message}")

@task
def test_my_log():
    <http://logger.info|logger.info>("This should now appear in the UI")
    return "Done"

@flow
def test_flow():
    return test_my_log()
3. For standard Python logging, which you mentioned in your last example:
Copy code
python
import logging
from prefect import task, flow
from prefect.logging import get_run_logger

# Configure the root logger to propagate to Prefect
logging.basicConfig(level=<http://logging.INFO|logging.INFO>)
root_logger = logging.getLogger()
root_logger.addHandler(PrefectCompatibleHandler())

@task
def test_my_log():
    # This will now show in the UI
    <http://logging.info|logging.info>("This is a test message")
    
    # This still works as before
    task_logger = get_run_logger()
    <http://task_logger.info|task_logger.info>("Direct Prefect logging")
    return "Done"

@flow
def test_flow():
    return test_my_log()
The key principles here are: 1. All logs must eventually go through Prefect's logging system to appear in the UI 2. You can either: - Forward logs to Prefect's logger using a custom handler - Or use
get_run_logger()
directly 3. Setting
propagate = True
ensures logs are passed up to parent loggers Try these modifications and let me know if you're still having issues. Also, could you share: 1. Which of these patterns (loguru, standard logging, or custom logger) are you primarily using? 2. Are you seeing any error messages when trying to log? This should help get your logs showing up in the UI while maintaining your existing logging setup.
u
@Marvin but I don't want to use python's standard logging but my own class built upon loguru
m
thought for 31.5 seconds
Ah, I understand - you want to keep using your custom loguru-based class while ensuring the logs appear in Prefect's UI. Let's solve this specifically for your loguru-based custom class. Here's how we can make your custom loguru-based logger work with Prefect's UI without switching to standard logging:
Copy code
python
import sys
from loguru import logger
from prefect import task, flow
from prefect.logging import get_run_logger

class MyLogW:
    def __init__(self, level="INFO"):
        self._level = level
        
        # Remove any existing handlers
        logger.remove()
        
        # Your existing stdout configuration
        LOG_FORMAT = "<your format>"  # Your existing format
        logger.add(sys.stdout, format=LOG_FORMAT, level=self._level)
        
        # Add a handler that will forward to Prefect's UI
        # This doesn't change your stdout logging, just adds UI logging
        def prefect_sink(message):
            try:
                prefect_logger = get_run_logger()
                # Forward the log to Prefect UI
                prefect_logger.log(
                    level=message.record["level"].no,
                    msg=message.record["message"]
                )
            except Exception:
                # If we're not in a flow/task context, just continue with normal logging
                pass
                
        # Add the Prefect sink alongside your existing stdout sink
        logger.add(
            prefect_sink,
            level=self._level,
            format="{message}"  # Keep it simple for UI
        )
    
    def debug(self, message):
        logger.debug(message)
    
    def info(self, message):
        <http://logger.info|logger.info>(message)
    
    def warning(self, message):
        logger.warning(message)
    
    def error(self, message):
        logger.error(message)

# Create your logger instance
my_logger = MyLogW(level="DEBUG")

@task
def test_task():
    <http://my_logger.info|my_logger.info>("This should appear in both stdout and Prefect UI")
    my_logger.debug("This debug message should also appear in both places")
    return "Done"

@flow
def test_flow():
    <http://my_logger.info|my_logger.info>("Flow started")
    result = test_task()
    <http://my_logger.info|my_logger.info>("Flow completed")
    return result

if __name__ == "__main__":
    test_flow()
Key points about this solution: 1. It maintains your existing loguru-based logging to stdout exactly as you have it 2. It adds a parallel sink that forwards logs to Prefect's UI 3. The
try/except
block in
prefect_sink
ensures your logger works even outside of Prefect flow/task contexts 4. It preserves all your log levels and custom formatting for stdout while keeping UI logs clean You can also create a more sophisticated version that allows different formatting for stdout vs UI: ```python from typing import Optional from prefect import task, flow from loguru import logger import sys class MyLogW: def __init__( self, level: str = "INFO", stdout_format: Optional[str] = None, ui_format: Optional[str] = None ): self._level = level self._stdout_format = stdout_format or "<green>{time:YYYY-MM-DD HHmmss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>&lt;cyan&gt;{function}&lt;/cyan&gt;<cyan>{line}</cyan> - <level>{message}</level>" self._ui_format = ui_format or "{message}" # Remove default handlers logger.remove() # Configure stdout logger.add( sys.stdout, format=self._stdout_format, level=self._level, enqueue=True # Makes it thread-safe ) # Configure Prefect UI logging def prefect_sink(message): try: prefect_logger = get_run_logger() log_level = message.record["level"].name msg = message.record["message"] # Map loguru levels to Prefect levels level_map = {
"DEBUG": prefect_logger.debug, "INFO": prefect_logger.info, "WARNING": prefect_logger.warning, "ERROR": prefect_logger.error, "CRITICAL": prefect_logger.critical } # Use the appropriate log level log_func = level_map.get(log_level, prefect_logger.info) log_func(msg) except Exception: # Not in a Prefect context, just continue pass # Add Prefect UI sink logger.add( prefect_sink, format=self._ui_format, level=self._level ) def debug(self, message: str) -> None: logger.debug(message) def info(self, message: str) -> None: logger.info(message) def warning(self, message: str) -> None: logger.warning(message) def error(self, message: str) -> None: logger.error(message) # Usage example @task def example_task(): logger = MyLogW(level="DEBUG") logger.debug("Debug message") logger.info("Info message") logger.warning("Warning message") return "Task complete" @flow def example_flow(): logger = MyLogW(level="DEBUG") logger.info("Starting flow") result = example_task() logger.info("Flow complete") return result ``` This solution: 1. Keeps your custom loguru-based logging class 2. Maintains your existing stdout logging exactly as is 3. Adds Prefect UI logging without modifying your existing setup 4. Handles all log levels appropriately 5. Works both inside and outside Prefect contexts Try this out and let me know if: 1. Are you seeing the logs in both stdout and the Prefect UI now? 2. Do you need any modifications to match your specific logging format or requirements? 3. Are there any specific log levels or features you need to handle differently?
u
@Marvin thanks but is there any other way? i am not allowed to make changes to the MyLogW class, since the class is being on larger level
m
thought for 31.3 seconds
Ah, I understand - you need a solution that works with the existing
MyLogW
class without modifying it. In this case, we can create a wrapper or interceptor that sits between your existing logging class and Prefect. Here's how we can do this:
Copy code
python
from prefect import task, flow
from prefect.logging import get_run_logger
from functools import wraps
from contextlib import contextmanager
import sys

# Assuming this is your existing logger instance that you can't modify
my_logger = MyLogW()  # Your existing logger instance

@contextmanager
def prefect_log_intercept():
    """Context manager to intercept logs and forward them to Prefect UI"""
    prefect_logger = get_run_logger()
    
    # Store original stdout
    original_stdout = sys.stdout
    
    # Create a custom stdout wrapper
    class StdoutInterceptor:
        def write(self, message):
            # Write to original stdout
            original_stdout.write(message)
            # Forward non-empty messages to Prefect
            if message.strip():
                <http://prefect_logger.info|prefect_logger.info>(message.strip())
        
        def flush(self):
            original_stdout.flush()
    
    try:
        # Replace stdout with our interceptor
        sys.stdout = StdoutInterceptor()
        yield
    finally:
        # Restore original stdout
        sys.stdout = original_stdout

# Decorator for tasks
def with_prefect_logging(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        with prefect_log_intercept():
            return func(*args, **kwargs)
    return wrapper

# Use in your flows and tasks
@task
@with_prefect_logging
def my_task():
    <http://my_logger.info|my_logger.info>("This should appear in both stdout and Prefect UI")
    return "Done"

@flow
@with_prefect_logging
def my_flow():
    <http://my_logger.info|my_logger.info>("Flow started")
    result = my_task()
    <http://my_logger.info|my_logger.info>("Flow completed")
    return result
Or, if you prefer a simpler approach without stdout manipulation:
Copy code
python
from prefect import task, flow
from prefect.logging import get_run_logger
from functools import wraps

def log_to_prefect(func):
    """Decorator to forward logs to Prefect UI"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        original_write = sys.stdout.write
        prefect_logger = get_run_logger()
        
        def new_write(message):
            original_write(message)
            if message.strip():
                <http://prefect_logger.info|prefect_logger.info>(message.strip())
        
        sys.stdout.write = new_write
        try:
            return func(*args, **kwargs)
        finally:
            sys.stdout.write = original_write
    return wrapper

@task
@log_to_prefect
def my_task():
    <http://my_logger.info|my_logger.info>("This should appear in both stdout and Prefect UI")
    return "Done"

@flow
@log_to_prefect
def my_flow():
    <http://my_logger.info|my_logger.info>("Flow started")
    result = my_task()
    <http://my_logger.info|my_logger.info>("Flow completed")
    return result
If you want an even less intrusive approach, you could create a separate logging handler just for your flows:
Copy code
python
from prefect import task, flow
from prefect.logging import get_run_logger

def log_with_prefect(message):
    """Helper function to log both with your logger and Prefect"""
    # Use your existing logger
    <http://my_logger.info|my_logger.info>(message)
    
    # Also log to Prefect UI
    try:
        prefect_logger = get_run_logger()
        <http://prefect_logger.info|prefect_logger.info>(message)
    except Exception:
        pass  # Not in a Prefect context

@task
def my_task():
    log_with_prefect("This will appear in both places")
    # Your regular logs will still work normally
    <http://my_logger.info|my_logger.info>("This will only appear in stdout")
    return "Done"

@flow
def my_flow():
    log_with_prefect("Flow started")
    result = my_task()
    log_with_prefect("Flow completed")
    return result
Choose the approach that best fits your constraints: 1. Context Manager Approach (first example): - Most robust - Captures all stdout during the context - More complex implementation
2. Decorator Approach (second example): - Simpler implementation - Less intrusive - Still captures stdout 3. Helper Function Approach (third example): - Least intrusive - Most explicit - Requires manual use of the helper function - Keeps existing logging behavior unchanged Let me know which approach would work best in your environment, and I can help you refine it further. Also: 1. Do you need to capture specific log levels (debug, warning, error)? 2. Are there any specific formatting requirements for the Prefect UI logs? 3. Do you need to handle any specific types of log messages differently?