Pav Staz
07/01/2025, 9:25 AM@task
async def check_logs():
async with get_client() as client:
# Ensure any pending logs are sent
await APILogHandler.aflush()
logs = await client.read_logs(
LogFilter(flow_run_id={"any_": [runtime.flow_run.id]})
)
records = []
for log in logs:
# Gets task run and flow run info
if log.task_run_id is not None:
task_runs = await client.read_task_runs(
task_run_filter=TaskRunFilter(id=TaskRunFilterId(any_=[log.task_run_id]))
)
task_run = task_runs[0]
task_run_name = task_run.name
print("-------------------- Task Run Details ----------------------------")
print(task_run)
print("------------------------------------------------------------------")
if task_run.flow_run_id is not None:
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(id=FlowRunFilterId(any_=[task_run.flow_run_id]))
)
flow_run = flow_runs[0]
print("-------------------- Flow Run Details ----------------------------")
print(flow_run)
print("------------------------------------------------------------------")
if flow_run.flow_id is not None:
flow = await client.read_flow(flow_run.flow_id)
print(f"Flow name is {flow.name}")
print(log)
@flow(log_prints=True)
def daily_flow():
## A subflow
some_subflow()
## A task
some_task()
check_logs.submit()
# only logs of the some_task are printed by the check_logs function, not the some_subflow function
Marvin
07/01/2025, 9:25 AMMarvin
07/01/2025, 9:25 AMpython
from prefect import flow, task, get_client
from prefect.logging.handlers import APILogHandler
from prefect.client.schemas import LogFilter, TaskRunFilter, FlowRunFilter
from prefect.client.schemas.filters import FlowRunFilterId
import asyncio
@task
async def get_all_flow_run_logs(parent_flow_run_id: str):
async with get_client() as client:
# Ensure any pending logs are sent
await APILogHandler.aflush()
# First, get all subflow runs
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
parent_flow_run_id={"any_": [parent_flow_run_id]}
)
)
# Include the parent flow run ID in our list of flow runs to fetch
all_flow_run_ids = [parent_flow_run_id] + [run.id for run in flow_runs]
# Get logs for all flow runs (parent and children)
logs = await client.read_logs(
LogFilter(flow_run_id={"any_": all_flow_run_ids})
)
# Process logs
formatted_logs = []
for log in logs:
if log.task_run_id:
task_runs = await client.read_task_runs(
task_run_filter=TaskRunFilter(id={"any_": [log.task_run_id]})
)
if task_runs:
task_run = task_runs[0]
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(id={"any_": [task_run.flow_run_id]})
)
if flow_runs:
flow_run = flow_runs[0]
flow = await client.read_flow(flow_run.flow_id)
formatted_logs.append({
"timestamp": log.timestamp,
"flow_name": flow.name,
"task_name": task_run.name,
"message": log.message,
"level": log.level
})
else:
# Handle flow-level logs
formatted_logs.append({
"timestamp": log.timestamp,
"message": log.message,
"level": log.level
})
return formatted_logs
@flow
async def parent_flow():
# Your flow logic here
await subflow()
await some_task()
# Get all logs at the end
logs = await get_all_flow_run_logs(parent_flow_run_id=flow_run.id)
# Example: Print formatted logs
for log in sorted(logs, key=lambda x: x["timestamp"]):
print(f"[{log['timestamp']}] {log.get('flow_name', 'FLOW')} - "
f"{log.get('task_name', 'N/A')} - {log['message']}")
Key improvements in this approach:
1. It uses parent_flow_run_id
to fetch all subflow runs associated with the parent flow
2. Collects logs from both the parent flow and all subflows
3. Organizes logs with relevant metadata (flow name, task name, timestamp)
4. Returns a structured format that you can easily use for email reports or other purposes
A few important notes:
- The logs are returned in a structured format that you can easily customize for your needs
- You might want to add error handling and pagination for large numbers of logs
- Consider adding filters for log levels if you only want specific types of logs
- The timestamps in the logs can be used to sort everything chronologically
If you need to generate an email report, you could easily extend this by adding a formatting function:
```python
def format_logs_forPav Staz
07/01/2025, 10:04 AMMarvin
07/01/2025, 10:04 AMMarvin
07/01/2025, 10:04 AM.submit()
with an async task in a synchronous flow. Here's how to modify the approach for a synchronous parent flow:
python
from prefect import flow, task, get_client
from prefect.logging.handlers import APILogHandler
from prefect.client.schemas import LogFilter, TaskRunFilter, FlowRunFilter
from prefect import runtime
@task
async def get_all_flow_run_logs(parent_flow_run_id: str):
async with get_client() as client:
# Ensure any pending logs are sent
await APILogHandler.aflush()
# First, get all subflow runs
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
parent_flow_run_id={"any_": [parent_flow_run_id]}
)
)
# Include the parent flow run ID in our list of flow runs to fetch
all_flow_run_ids = [parent_flow_run_id] + [run.id for run in flow_runs]
# Get logs for all flow runs (parent and children)
logs = await client.read_logs(
LogFilter(flow_run_id={"any_": all_flow_run_ids})
)
# Process logs
formatted_logs = []
for log in logs:
if log.task_run_id:
task_runs = await client.read_task_runs(
task_run_filter=TaskRunFilter(id={"any_": [log.task_run_id]})
)
if task_runs:
task_run = task_runs[0]
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(id={"any_": [task_run.flow_run_id]})
)
if flow_runs:
flow_run = flow_runs[0]
flow = await client.read_flow(flow_run.flow_id)
formatted_logs.append({
"timestamp": log.timestamp,
"flow_name": flow.name,
"task_name": task_run.name,
"message": log.message,
"level": log.level
})
else:
# Handle flow-level logs
formatted_logs.append({
"timestamp": log.timestamp,
"message": log.message,
"level": log.level
})
return formatted_logs
@flow
def parent_flow():
# Your flow logic here
subflow()
some_task()
# Get all logs at the end - using .submit()
future = get_all_flow_run_logs.submit(parent_flow_run_id=runtime.flow_run.id)
# If you need the results immediately:
logs = future.result()
# Example: Print formatted logs
for log in sorted(logs, key=lambda x: x["timestamp"]):
print(f"[{log['timestamp']}] {log.get('flow_name', 'FLOW')} - "
f"{log.get('task_name', 'N/A')} - {log['message']}")
A few important points:
1. The async task can be submitted using .submit()
in a synchronous flow - Prefect handles the async/sync conversion internally
2. If you need the results immediately, you can call .result()
on the future
3. If you don't need the results immediately (e.g., if you're just sending them to another task), you can pass the future directly
You could also chain it with another task:
python
@task
def send_email_report(logs):
# Format and send email with logs
pass
@flow
def parent_flow():
subflow()
some_task()
logs_future = get_all_flow_run_logs.submit(parent_flow_run_id=runtime.flow_run.id)
send_email_report.submit(logs_future)
This way, thePav Staz
07/01/2025, 10:55 AMMarvin
07/01/2025, 10:55 AMMarvin
07/01/2025, 10:55 AMPav Staz
07/01/2025, 2:58 PMMarvin
07/01/2025, 2:58 PMMarvin
07/01/2025, 2:58 PM<pre>
tags or preserve whitespace using CSS. Here's the modified version that will maintain your DBT log formatting:
python
from prefect import task
from email.mime.text import MIMETextBase
from email.mime.multipart import MIMEMultipart
import smtplib
from datetime import datetime
import pytz
import html
@task
def format_logs_to_html(formatted_logs, flow_name: str) -> str:
level_colors = {
"ERROR": "#ffebee",
"WARNING": "#fff3e0",
"INFO": "#f1f8e9",
"DEBUG": "#f5f5f5"
}
sorted_logs = sorted(formatted_logs, key=lambda x: x["timestamp"])
html = f"""
<html>
<head>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
.header {{ background-color: #f8f9fa; padding: 20px; margin-bottom: 20px; }}
.log-entry {{ padding: 10px; margin: 5px 0; border-radius: 4px; }}
.timestamp {{ color: #666; font-size: 0.9em; }}
.flow-name {{ font-weight: bold; color: #2196F3; }}
.task-name {{ color: #673AB7; }}
.message {{
margin-top: 5px;
white-space: pre-wrap; /* Preserves whitespace and wraps */
font-family: monospace; /* Better for log output */
font-size: 12px;
}}
pre {{
margin: 0;
white-space: pre-wrap;
word-wrap: break-word;
font-family: monospace;
font-size: 12px;
}}
</style>
</head>
<body>
<div class="header">
<h2>Flow Run Report: {flow_name}</h2>
<p>Generated at: {datetime.now(pytz.UTC).strftime('%Y-%m-%d %H:%M:%S UTC')}</p>
</div>
"""
current_flow = None
for log in sorted_logs:
timestamp = log["timestamp"].strftime("%Y-%m-%d %H:%M:%S UTC")
level = log.get("level", "INFO")
bg_color = level_colors.get(level, "#ffffff")
# Check if we're starting a new flow section
flow_name = log.get("flow_name")
if flow_name and flow_name != current_flow:
if current_flow is not None:
html += "</div>"
current_flow = flow_name
html += f'<h3>Flow: {flow_name}</h3><div class="flow-section">'
html += f"""
<div class="log-entry" style="background-color: {bg_color}">
<span class="timestamp">{timestamp}</span>
"""
if "task_name" in log:
html += f'<br><span class="task-name">Task: {log["task_name"]}</span>'
# Escape HTML entities and wrap message in pre tags
escaped_message = html.escape(log["message"])
html += f"""
<div class="message"><pre>{escaped_message}</pre></div>
</div>
"""
html += "</div></body></html>"
return html
Key changes made to preserve formatting:
1. Added white-space: pre-wrap
to preserve whitespace while still allowing wrapping
2. Used <pre>
tags around the message content
3. Added html.escape()
to properly escape any HTML characters in the logs
4. Set font-family: monospace
for better log readability
5. Added specific styling for pre
tags to ensure consistent formatting
This should now properly display