Ravish Mallya
07/23/2025, 6:22 AMMarvin
07/23/2025, 6:22 AMMarvin
07/23/2025, 6:24 AMpython
from prefect.client import get_client
from prefect.logging.loggers import PrefectLogAdapter
async def get_flow_run_logs(flow_run_id: str):
async with get_client() as client:
# Fetch logs for the specific flow run
logs = await client.read_logs(
log_filter={
"flow_run_id": {"any_": [flow_run_id]},
}
)
return logs
You can use this function asynchronously to fetch logs for a specific flow run. The logs will be returned as a list of log entries.
A few important notes:
1. This is an async function, so you'll need to run it in an async context
2. The logs are returned in chronological order
3. You can add additional filters to the log_filter
parameter to narrow down the logs you want to see
Here's a more complete example showing how to use it:
python
import asyncio
from prefect.client import get_client
async def get_flow_run_logs(flow_run_id: str):
async with get_client() as client:
# You can add additional filters like level if needed
logs = await client.read_logs(
log_filter={
"flow_run_id": {"any_": [flow_run_id]},
# Optionally filter by log level
# "level": {"ge_": 20} # INFO level and above
}
)
return logs
# Example usage
async def main():
flow_run_id = "your-flow-run-id"
logs = await get_flow_run_logs(flow_run_id)
for log in logs:
print(f"{log.timestamp} - {log.message}")
# Run the async function
if __name__ == "__main__":
asyncio.run(main())
The logs returned will include various fields such as:
- `timestamp`: When the log was created
- `message`: The actual log message
- `level`: The log level (e.g., INFO, ERROR)
- `flow_run_id`: The ID of the flow run
- `task_run_id`: The ID of the task run (if applicable)
You can also access logs through the REST API endpoint at /api/logs/filter
if you prefer to use HTTP requests directly, but the client method shown above is the recommended approach when working with Python.