Hi <@ULVA73B9P>, I'm trying to format logs at the ...
# ask-marvin
p
Hi @Marvin, I'm trying to format logs at the end of my master flow using client.read_logs, and from that using flow_run_id and task_run_id to get flow and task names programmatically (and other information). Problem It seems, however, that on the UI (and within a python flow client) that I can't get all the logs of child flows within parent flows to be together. Only the tasks with the parent flow show logs and not subflows. This means I can't get ALL logs of a parent flow and then format it for like a 'success email' for the entire flow. Unless someone has a better idea that gives me an 'overall report' of a flow and all of it is subflows I am open to ideas here is my code
Copy code
@task
async def check_logs():
    async with get_client() as client:
        # Ensure any pending logs are sent
        await APILogHandler.aflush()
        logs = await client.read_logs(
            LogFilter(flow_run_id={"any_": [runtime.flow_run.id]})
        )

        records = []
        for log in logs:
            
            # Gets task run and flow run info
            if log.task_run_id is not None:
                task_runs = await client.read_task_runs(
                    task_run_filter=TaskRunFilter(id=TaskRunFilterId(any_=[log.task_run_id]))
                )
                task_run = task_runs[0]
                task_run_name = task_run.name
                print("-------------------- Task Run Details ----------------------------")
                print(task_run)
                print("------------------------------------------------------------------")
                if task_run.flow_run_id is not None:
                    flow_runs = await client.read_flow_runs(
                        flow_run_filter=FlowRunFilter(id=FlowRunFilterId(any_=[task_run.flow_run_id]))
                    )
                    flow_run = flow_runs[0]
                    print("-------------------- Flow Run Details ----------------------------")
                    print(flow_run)
                    print("------------------------------------------------------------------")
                    if flow_run.flow_id is not None:
                        flow = await client.read_flow(flow_run.flow_id)
                        print(f"Flow name is {flow.name}")

            print(log)


@flow(log_prints=True)
def daily_flow():
    
    ## A subflow
    some_subflow()
    
    ## A task
    some_task()
    
    check_logs.submit()
    
    # only logs of the some_task are printed by the check_logs function, not the some_subflow function
m
thought for 33.0 seconds
Let me help you with getting comprehensive logs for flows including their subflows. You're right that this is a common need for generating complete flow run reports. For Prefect 3.x, here's how you can modify your approach to get all logs, including those from subflows:
Copy code
python
from prefect import flow, task, get_client
from prefect.logging.handlers import APILogHandler
from prefect.client.schemas import LogFilter, TaskRunFilter, FlowRunFilter
from prefect.client.schemas.filters import FlowRunFilterId
import asyncio

@task
async def get_all_flow_run_logs(parent_flow_run_id: str):
    async with get_client() as client:
        # Ensure any pending logs are sent
        await APILogHandler.aflush()
        
        # First, get all subflow runs
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                parent_flow_run_id={"any_": [parent_flow_run_id]}
            )
        )
        
        # Include the parent flow run ID in our list of flow runs to fetch
        all_flow_run_ids = [parent_flow_run_id] + [run.id for run in flow_runs]
        
        # Get logs for all flow runs (parent and children)
        logs = await client.read_logs(
            LogFilter(flow_run_id={"any_": all_flow_run_ids})
        )
        
        # Process logs
        formatted_logs = []
        for log in logs:
            if log.task_run_id:
                task_runs = await client.read_task_runs(
                    task_run_filter=TaskRunFilter(id={"any_": [log.task_run_id]})
                )
                if task_runs:
                    task_run = task_runs[0]
                    flow_runs = await client.read_flow_runs(
                        flow_run_filter=FlowRunFilter(id={"any_": [task_run.flow_run_id]})
                    )
                    if flow_runs:
                        flow_run = flow_runs[0]
                        flow = await client.read_flow(flow_run.flow_id)
                        
                        formatted_logs.append({
                            "timestamp": log.timestamp,
                            "flow_name": flow.name,
                            "task_name": task_run.name,
                            "message": log.message,
                            "level": log.level
                        })
            else:
                # Handle flow-level logs
                formatted_logs.append({
                    "timestamp": log.timestamp,
                    "message": log.message,
                    "level": log.level
                })
                
        return formatted_logs

@flow
async def parent_flow():
    # Your flow logic here
    await subflow()
    await some_task()
    
    # Get all logs at the end
    logs = await get_all_flow_run_logs(parent_flow_run_id=flow_run.id)
    
    # Example: Print formatted logs
    for log in sorted(logs, key=lambda x: x["timestamp"]):
        print(f"[{log['timestamp']}] {log.get('flow_name', 'FLOW')} - "
              f"{log.get('task_name', 'N/A')} - {log['message']}")
Key improvements in this approach: 1. It uses
parent_flow_run_id
to fetch all subflow runs associated with the parent flow 2. Collects logs from both the parent flow and all subflows 3. Organizes logs with relevant metadata (flow name, task name, timestamp) 4. Returns a structured format that you can easily use for email reports or other purposes A few important notes: - The logs are returned in a structured format that you can easily customize for your needs - You might want to add error handling and pagination for large numbers of logs - Consider adding filters for log levels if you only want specific types of logs - The timestamps in the logs can be used to sort everything chronologically If you need to generate an email report, you could easily extend this by adding a formatting function: ```python def format_logs_for
❤️ 1
p
@Marvin I'm using a synchronous parent flow. Can I do get_all_flow_run_logs(parent_flow).submit()?
m
thought for 14.4 seconds
Yes, you can use
.submit()
with an async task in a synchronous flow. Here's how to modify the approach for a synchronous parent flow:
Copy code
python
from prefect import flow, task, get_client
from prefect.logging.handlers import APILogHandler
from prefect.client.schemas import LogFilter, TaskRunFilter, FlowRunFilter
from prefect import runtime

@task
async def get_all_flow_run_logs(parent_flow_run_id: str):
    async with get_client() as client:
        # Ensure any pending logs are sent
        await APILogHandler.aflush()
        
        # First, get all subflow runs
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                parent_flow_run_id={"any_": [parent_flow_run_id]}
            )
        )
        
        # Include the parent flow run ID in our list of flow runs to fetch
        all_flow_run_ids = [parent_flow_run_id] + [run.id for run in flow_runs]
        
        # Get logs for all flow runs (parent and children)
        logs = await client.read_logs(
            LogFilter(flow_run_id={"any_": all_flow_run_ids})
        )
        
        # Process logs
        formatted_logs = []
        for log in logs:
            if log.task_run_id:
                task_runs = await client.read_task_runs(
                    task_run_filter=TaskRunFilter(id={"any_": [log.task_run_id]})
                )
                if task_runs:
                    task_run = task_runs[0]
                    flow_runs = await client.read_flow_runs(
                        flow_run_filter=FlowRunFilter(id={"any_": [task_run.flow_run_id]})
                    )
                    if flow_runs:
                        flow_run = flow_runs[0]
                        flow = await client.read_flow(flow_run.flow_id)
                        
                        formatted_logs.append({
                            "timestamp": log.timestamp,
                            "flow_name": flow.name,
                            "task_name": task_run.name,
                            "message": log.message,
                            "level": log.level
                        })
            else:
                # Handle flow-level logs
                formatted_logs.append({
                    "timestamp": log.timestamp,
                    "message": log.message,
                    "level": log.level
                })
                
        return formatted_logs

@flow
def parent_flow():
    # Your flow logic here
    subflow()
    some_task()
    
    # Get all logs at the end - using .submit()
    future = get_all_flow_run_logs.submit(parent_flow_run_id=runtime.flow_run.id)
    
    # If you need the results immediately:
    logs = future.result()
    
    # Example: Print formatted logs
    for log in sorted(logs, key=lambda x: x["timestamp"]):
        print(f"[{log['timestamp']}] {log.get('flow_name', 'FLOW')} - "
              f"{log.get('task_name', 'N/A')} - {log['message']}")
A few important points: 1. The async task can be submitted using
.submit()
in a synchronous flow - Prefect handles the async/sync conversion internally 2. If you need the results immediately, you can call
.result()
on the future 3. If you don't need the results immediately (e.g., if you're just sending them to another task), you can pass the future directly You could also chain it with another task:
Copy code
python
@task
def send_email_report(logs):
    # Format and send email with logs
    pass

@flow
def parent_flow():
    subflow()
    some_task()
    
    logs_future = get_all_flow_run_logs.submit(parent_flow_run_id=runtime.flow_run.id)
    send_email_report.submit(logs_future)
This way, the
p
@Marvin this works great thank you. How do I now format the log infomation I have collected into a nice email. as a reminder formatted_logs is a list of dictionaries appended here in this step formatted_logs._append_({ "timestamp": log.timestamp, "flow_name": flow.name, "task_name": task_run.name, "message": log.message, "level": log.level }) else: # Handle flow-level logs formatted_logs._append_({ "timestamp": log.timestamp, "message": log.message, "level": log.level })
m
thought for 18.0 seconds
I'll help you create a nicely formatted email from your log data. Here's a solution that creates an HTML email with proper formatting and color-coding for different log levels: ```python from prefect import task from email.mime.text import MIMETextBase from email.mime.multipart import MIMEMultipart import smtplib from datetime import datetime import pytz @task def format_logs_to_html(formatted_logs, flow_name: str) -> str: # Color mapping for different log levels level_colors = { "ERROR": "#ffebee", # Light red "WARNING": "#fff3e0", # Light orange "INFO": "#f1f8e9", # Light green "DEBUG": "#f5f5f5" # Light grey } # Sort logs by timestamp sorted_logs = sorted(formatted_logs, key=lambda x: x["timestamp"]) # Create HTML header html = f""" <html> <head> <style> body {{ font-family: Arial, sans-serif; margin: 20px; }} .header {{ background-color: #f8f9fa; padding: 20px; margin-bottom: 20px; }} .log-entry {{ padding: 10px; margin: 5px 0; border-radius: 4px; }} .timestamp {{ color: #666; font-size: 0.9em; }} .flow-name {{ font-weight: bold; color: #2196F3; }} .task-name {{ color: #673AB7; }} .message {{ margin-top: 5px; }} </style> </head> <body> <div class="header"> <h2>Flow Run Report: {flow_name}</h2> <p>Generated at: {datetime.now(pytz.UTC).strftime('%Y-%m-%d %H:%M:%S UTC')}</p> </div> """ # Group logs by flow current_flow = None for log in sorted_logs: timestamp = log["timestamp"].strftime("%Y-%m-%d %H:%M:%S UTC") level = log.get("level", "INFO") bg_color = level_colors.get(level, "#ffffff") # Check if we're starting a new flow section flow_name = log.get("flow_name") if flow_name and flow_name != current_flow: if current_flow is not None: html += "</div>" current_flow = flow_name html += f'<h3>Flow: {flow_name}</h3><div class="flow-section">' html += f""" <div class="log-entry" style="background-color: {bg_color}"> <span class="timestamp">{timestamp}</span> """ if "task_name" in log: html += f'<br><span class="task-name">Task: {log["task_name"]}</span>' html += f""" <div class="message">{log["message"]}</div> </div> """ html += "</div></body></html>" return html @task def send_email(html_content: str, subject: str, recipient: str): # Email configuration smtp_server = "your_smtp_server" smtp_port = 587 sender_email = "your_email@example.com" password = "your_password" # Consider using Prefect secrets for this # Create message message = MIMEMultipart("alternative") message["Subject"] = subject message["From"] = sender_email message["To"] = recipient # Attach HTML content message.attach(MIMETextBase(html_content, "html")) # Send email with smtplib.SMTP(smtp_server, smtp_port) as server: server.starttls() server.login(sender_email, password) server.send_message(message)
p
@Marvin I have some dbt logs being passed through to the log of a task. In the UI it maintains the dbt log format, but using this html format you suggested, the lines and indents are lost and all crammed together. How do i retain the dbt run logs through the html code?
m
thought for 15.1 seconds
Ah yes, to preserve the formatting of DBT logs (or any pre-formatted text) in HTML, we need to use either
<pre>
tags or preserve whitespace using CSS. Here's the modified version that will maintain your DBT log formatting:
Copy code
python
from prefect import task
from email.mime.text import MIMETextBase
from email.mime.multipart import MIMEMultipart
import smtplib
from datetime import datetime
import pytz
import html

@task
def format_logs_to_html(formatted_logs, flow_name: str) -> str:
    level_colors = {
        "ERROR": "#ffebee",    
        "WARNING": "#fff3e0",  
        "INFO": "#f1f8e9",     
        "DEBUG": "#f5f5f5"     
    }
    
    sorted_logs = sorted(formatted_logs, key=lambda x: x["timestamp"])
    
    html = f"""
    <html>
    <head>
        <style>
            body {{ font-family: Arial, sans-serif; margin: 20px; }}
            .header {{ background-color: #f8f9fa; padding: 20px; margin-bottom: 20px; }}
            .log-entry {{ padding: 10px; margin: 5px 0; border-radius: 4px; }}
            .timestamp {{ color: #666; font-size: 0.9em; }}
            .flow-name {{ font-weight: bold; color: #2196F3; }}
            .task-name {{ color: #673AB7; }}
            .message {{ 
                margin-top: 5px;
                white-space: pre-wrap;  /* Preserves whitespace and wraps */
                font-family: monospace; /* Better for log output */
                font-size: 12px;
            }}
            pre {{
                margin: 0;
                white-space: pre-wrap;
                word-wrap: break-word;
                font-family: monospace;
                font-size: 12px;
            }}
        </style>
    </head>
    <body>
        <div class="header">
            <h2>Flow Run Report: {flow_name}</h2>
            <p>Generated at: {datetime.now(pytz.UTC).strftime('%Y-%m-%d %H:%M:%S UTC')}</p>
        </div>
    """
    
    current_flow = None
    for log in sorted_logs:
        timestamp = log["timestamp"].strftime("%Y-%m-%d %H:%M:%S UTC")
        level = log.get("level", "INFO")
        bg_color = level_colors.get(level, "#ffffff")
        
        # Check if we're starting a new flow section
        flow_name = log.get("flow_name")
        if flow_name and flow_name != current_flow:
            if current_flow is not None:
                html += "</div>"
            current_flow = flow_name
            html += f'<h3>Flow: {flow_name}</h3><div class="flow-section">'
        
        html += f"""
        <div class="log-entry" style="background-color: {bg_color}">
            <span class="timestamp">{timestamp}</span>
        """
        
        if "task_name" in log:
            html += f'<br><span class="task-name">Task: {log["task_name"]}</span>'
        
        # Escape HTML entities and wrap message in pre tags
        escaped_message = html.escape(log["message"])
        html += f"""
            <div class="message"><pre>{escaped_message}</pre></div>
        </div>
        """
    
    html += "</div></body></html>"
    return html
Key changes made to preserve formatting: 1. Added
white-space: pre-wrap
to preserve whitespace while still allowing wrapping 2. Used
<pre>
tags around the message content 3. Added
html.escape()
to properly escape any HTML characters in the logs 4. Set
font-family: monospace
for better log readability 5. Added specific styling for
pre
tags to ensure consistent formatting This should now properly display