Jackson Nguyen
09/22/2025, 9:50 PMMarvin
09/22/2025, 9:50 PMMarvin
09/22/2025, 9:50 PMJackson Nguyen
09/22/2025, 9:51 PMMarvin
09/22/2025, 9:51 PMMarvin
09/22/2025, 9:51 PMJackson Nguyen
09/22/2025, 9:52 PMMarvin
09/22/2025, 9:52 PMMarvin
09/22/2025, 9:55 PMprefect flow-run logs <FLOW_RUN_ID>
prefect flow-run logs <FLOW_RUN_ID> --tail -n 200 > flow_logs.txt
- Task runs:
prefect task-run logs <TASK_RUN_ID>
prefect task-run logs <TASK_RUN_ID> --reverse --head -n 100 > task_logs.txt
- Python client (export as plain text however you like):
import asyncio
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import LogFilter, LogFilterFlowRunId
async def fetch_logs(flow_run_id: str):
async with get_client() as client:
logs = await client.read_logs(
log_filter=LogFilter(flow_run_id=LogFilterFlowRunId(any_=[flow_run_id]))
)
for l in logs:
# format to your liking
print(f"{l.timestamp} {l.level.name} {l.name} - {l.message}")
asyncio.run(fetch_logs("<FLOW_RUN_ID>"))
- Make your logs “look” plain in the UI by using a simple formatter in your logging config. This won’t change the UI’s layout, but your messages will be plain. See logging customization.
References:
- Add logging to flows/tasks
- Logging customization
- Logs API reference
If your goal is easy copy/paste or bulk export from the UI, tell me your use case—I can suggest a tailored script or we can open a feature request for a “Download logs” / “Plain text” view.Jackson Nguyen
09/22/2025, 9:59 PMMarvin
09/22/2025, 9:59 PMMarvin
09/22/2025, 10:03 PMfrom prefect import flow
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import LogFilter, LogFilterFlowRunId
from prefect.artifacts import create_markdown_artifact
async def logs_to_artifact(flow, flow_run, state):
# Fetch all logs for this flow run
async with get_client() as client:
logs = await client.read_logs(
log_filter=LogFilter(flow_run_id=LogFilterFlowRunId(any_=[flow_run.id]))
)
# Sort and render as plain text
logs = sorted(logs, key=lambda l: l.timestamp)
lines = []
for l in logs:
# Some fields vary by environment, so be defensive
ts = getattr(l, "timestamp", "")
lvl = getattr(l, "level", "")
name = getattr(l, "name", "")
msg = getattr(l, "message", "")
lines.append(f"{ts} {lvl} {name} - {msg}")
text = "\n".join(lines)
# Wrap in a code fence to keep it as plain text in the UI
create_markdown_artifact(
markdown=f"```\n{text}\n```",
key=f"{flow_run.id}-logs",
description="Plaintext logs for this flow run"
)
@flow(on_completion=logs_to_artifact, on_failure=logs_to_artifact)
def my_flow():
# your flow code here
pass
Notes and options:
- This attaches the artifact directly to the run (because it’s created inside the run via a hook).
- If your logs can be very large, consider:
- Chunking into multiple artifacts (e.g., create one artifact per N characters)
- Uploading the text to object storage (S3/GCS) and creating a link artifact pointing to it:
from prefect.artifacts import create_link_artifact
create_link_artifact(link="<presigned-url-or-static-url>", link_text="Download full logs", key=f"{flow_run.id}-logs")
- The filter by flow_run_id
includes both flow- and task-run logs for that run.
Docs:
- Artifacts API (markdown/link/table)
If you want, I can adapt this to:
- Only include ERROR/CRITICAL logs
- Include task-run IDs or other fields
- Store in S3/GCS and link instead of embedding in a Markdown artifact
Tell me your expected log sizes and storage preferences, and I’ll tailor the example.Jackson Nguyen
09/22/2025, 10:52 PMMarvin
09/22/2025, 10:52 PMMarvin
09/22/2025, 10:57 PMread_logs
returns logs in pages, so my quick example only got the first page. For ~10 MB logs, I recommend one of these patterns:
A) Chunked Markdown artifacts (keeps everything in Prefect)
- Creates multiple artifacts of plain text, each a manageable size
- Easy to click-through in the run UI
from prefect import flow
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import LogFilter, LogFilterFlowRunId
from prefect.artifacts import create_markdown_artifact
PAGE_SIZE = 1000 # logs per API call
CHUNK_CHARS = 500_000 # ~0.5MB per artifact block
async def create_full_log_artifacts(flow, flow_run, state):
# Fetch all pages
async with get_client() as client:
offset = 0
all_logs = []
while True:
batch = await client.read_logs(
log_filter=LogFilter(flow_run_id=LogFilterFlowRunId(any_=[flow_run.id])),
limit=PAGE_SIZE,
offset=offset,
)
if not batch:
break
all_logs.extend(batch)
offset += len(batch)
# Sort by timestamp and render as plain text
all_logs.sort(key=lambda l: getattr(l, "timestamp", None))
lines = []
for l in all_logs:
ts = getattr(l, "timestamp", "")
lvl = getattr(l, "level", "")
name = getattr(l, "name", "")
msg = getattr(l, "message", "")
lines.append(f"{ts} {lvl} {name} - {msg}")
text = "\n".join(lines)
# Chunk into multiple Markdown artifacts so the UI can load them
for i in range(0, len(text), CHUNK_CHARS):
chunk = text[i:i+CHUNK_CHARS]
create_markdown_artifact(
markdown=f"```\n{chunk}\n```",
key=f"{flow_run.id}-logs-{i//CHUNK_CHARS+1}",
description="Plaintext logs (chunked)"
)
@flow(on_completion=create_full_log_artifacts, on_failure=create_full_log_artifacts, on_crashed=create_full_log_artifacts)
def my_flow():
# your flow here
...
B) Upload to object storage + link artifact (recommended for very large logs)
- Fast to view/download; avoids heavy UI rendering of huge Markdown
- Example below uses S3 with gzip
```
from prefect import flow
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import LogFilter, LogFilterFlowRunId
from prefect.artifacts import create_link_artifact
import io, gzip, asyncio, boto3
PAGE_SIZE = 1000
async def create_log_link_artifact(flow, flow_run, state, bucket: str, key_prefix: str):
# Fetch all logs (pagination)
async with get_client() as client:
offset = 0
logs = []
while True:
batch = await client.read_logs(
log_filter=LogFilter(flow_run_id=LogFilterFlowRunId(any_=[flow_run.id])),
limit=PAGE_SIZE,
offset=offset,
)
if not batch:
break
logs.extend(batch)
offset += len(batch)
# Format plaintext
logs.sort(key=lambda l: getattr(l, "timestamp", None))
text = "\n".join(
f"{getattr(l,'timestamp','')} {getattr(l,'level','')} {getattr(l,'name','')} - {getattr(l,'message','')}"
for l in logs
)
# Gzip and upload
s3 = boto3.client("s3")
key = f"{key_prefix}/{flow_run.id}.log.gz"
buf = io.BytesIO()
with gzip.GzipFile(fileobj=buf, mode="wb") as f:
f.write(text.encode("utf-8"))
buf.seek(0)
s3.upload_fileobj(buf, bucket, key, ExtraArgs={"ContentType": "text/plain", "ContentEncoding": "gzip"})
# Presigned URL and artifact
url = s3.generate_presigned_url(
ClientMethod="get_object",
Params={"Bucket": bucket, "Key": key},
ExpiresIn=7 * 24 * 3600, # 7 days
)
create_link_artifact(
link=url,
link_text="Download full logs (gzip)",
key=f"{flow_run.id}-logs",Marvin
09/22/2025, 10:57 PMJackson Nguyen
09/22/2025, 11:20 PMMarvin
09/22/2025, 11:20 PMMarvin
09/22/2025, 11:21 PMfrom prefect import flow
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import LogFilter, LogFilterFlowRunId
from prefect.artifacts import create_markdown_artifact
PAGE_SIZE = 1000 # logs per API call; adjust if needed
CHUNK_CHARS = 1_000_000 # ~1MB per artifact chunk; adjust for your UI performance
async def full_log_to_chunked_artifacts(flow, flow_run, state):
# 1) Page through all logs for this run
async with get_client() as client:
offset = 0
all_logs = []
while True:
batch = await client.read_logs(
log_filter=LogFilter(flow_run_id=LogFilterFlowRunId(any_=[flow_run.id])),
limit=PAGE_SIZE,
offset=offset,
)
if not batch:
break
all_logs.extend(batch)
offset += len(batch)
# 2) Sort and format as plain text
all_logs.sort(key=lambda l: getattr(l, "timestamp", None))
lines = []
for l in all_logs:
ts = getattr(l, "timestamp", "")
lvl = str(getattr(l, "level", ""))
name = getattr(l, "name", "")
msg = getattr(l, "message", "")
lines.append(f"{ts} {lvl} {name} - {msg}")
text = "\n".join(lines)
# 3) Emit chunked Markdown artifacts (rendered inline in the UI)
if not text:
create_markdown_artifact(
markdown="```\n<no logs found>\n```",
key=f"{flow_run.id}-logs-1",
description="Plaintext logs (chunk 1)"
)
return
for i in range(0, len(text), CHUNK_CHARS):
chunk = text[i:i+CHUNK_CHARS]
chunk_num = (i // CHUNK_CHARS) + 1
create_markdown_artifact(
markdown=f"```\n{chunk}\n```",
key=f"{flow_run.id}-logs-{chunk_num}",
description=f"Plaintext logs (chunk {chunk_num})"
)
@flow(
on_completion=full_log_to_chunked_artifacts,
on_failure=full_log_to_chunked_artifacts,
on_crashed=full_log_to_chunked_artifacts,
)
def my_flow():
# your flow here
...
Why this works for your needs:
- It paginates through the entire log set so nothing is missed.
- It renders as plain text in the Prefect UI (no download needed).
- Chunking avoids UI slowdowns with very large single artifacts. With ~10 MB logs and 1 MB chunks, you’ll see about 10 artifacts on the run page you can click through quickly.
If you truly need a single on-page artifact, you can set CHUNK_CHARS to a very large value to force one artifact. Just note very large Markdown payloads can render slowly or be clipped by browser/DB constraints; chunking is more reliable.
References:
- Artifacts API
- Logs API/models
If you want, I can:
- Add a top-level “index” artifact that lists and links to each chunk
- Filter to only ERROR/CRITICAL lines or include task run IDs
- Tune the page size/chunk size for your environment (Server vs Cloud)Jackson Nguyen
09/22/2025, 11:40 PMMarvin
09/22/2025, 11:40 PMMarvin
09/22/2025, 11:41 PMfrom prefect import flow
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import LogFilter, LogFilterFlowRunId
from prefect.artifacts import create_markdown_artifact
from prefect.runtime import flow_run as rt_flow_run
PAGE_SIZE = 1000 # logs per API call
CHUNK_CHARS = 1_000_000 # ~1 MB of text per artifact chunk; adjust as needed
def _fmt_level(lvl):
# lvl may be an enum or string; normalize to a string
if hasattr(lvl, "name"):
return lvl.name
return "" if lvl is None else str(lvl)
async def full_log_to_chunked_artifacts_with_index(flow, flow_run, state):
# 1) Fetch ALL logs with pagination
async with get_client() as client:
offset = 0
all_logs = []
while True:
batch = await client.read_logs(
log_filter=LogFilter(flow_run_id=LogFilterFlowRunId(any_=[flow_run.id])),
limit=PAGE_SIZE,
offset=offset,
)
if not batch:
break
all_logs.extend(batch)
offset += len(batch)
# 2) Sort and format as plaintext
all_logs.sort(key=lambda l: getattr(l, "timestamp", None))
lines = []
for l in all_logs:
ts = getattr(l, "timestamp", "")
lvl = _fmt_level(getattr(l, "level", None))
name = getattr(l, "name", "")
msg = getattr(l, "message", "")
lines.append(f"{ts} {lvl} {name} - {msg}")
text = "\n".join(lines)
# 3) Create chunked Markdown artifacts so they render inline in the UI
chunk_meta = [] # collect metadata for the index: [{'num': 1, 'key': '...', 'id': UUID, 'size': 12345}, ...]
if not text:
art_id = create_markdown_artifact(
markdown="```\n<no logs found>\n```",
key=f"{flow_run.id}-logs-1",
description="Plaintext logs (chunk 1)",
)
chunk_meta.append({"num": 1, "key": f"{flow_run.id}-logs-1", "id": str(art_id), "size": 0})
else:
chunk_num = 0
for i in range(0, len(text), CHUNK_CHARS):
chunk_num += 1
chunk = text[i:i+CHUNK_CHARS]
key = f"{flow_run.id}-logs-{chunk_num}"
art_id = create_markdown_artifact(
markdown=f"```\n{chunk}\n```",
key=key,
description=f"Plaintext logs (chunk {chunk_num})",
)
chunk_meta.append({"num": chunk_num, "key": key, "id": str(art_id), "size": len(chunk)})
# 4) Create an index artifact that summarizes all chunks
run_url = ""
try:
run_url = rt_flow_run.ui_url() or ""
except Exception:
pass
index_lines = []
index_lines.append(f"# Log index for flow run {flow_run.id}")
if run_url:
index_lines.append(f"- Run UI: {run_url}")
index_lines.append("")
index_lines.append("The following chunks are available in the Artifacts panel for this run:")
index_lines.append("")
index_lines.append("| Chunk | Key | Artifact ID | Size (chars) |")
index_lines.append("| --- | --- | --- | --- |")
for m in chunk_meta:
index_lines.append(f"| {m['num']} | `{m['key']}` | `{m['id']}` | {m['size']} |")
create_markdown_artifact(
markdown="\n".join(index_lines),
key=f"{flow_run.id}-logs-index",
description="Index for chunked plaintext logs",
)
@flow(
on_completion=full_log_to_chunked_artifacts_with_index,
on_failure=full_log_to_chunked_artifacts_with_index,
on_crashed=full_log_to_chunked_artifacts_with_index,
)
def my_flow():
# your flow here
...
How to use itMarvin
09/22/2025, 11:41 PMJackson Nguyen
09/22/2025, 11:51 PMMarvin
09/22/2025, 11:51 PMMarvin
09/23/2025, 12:04 AMfrom datetime import timedelta
from prefect import flow
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import LogFilter, LogFilterFlowRunId
from prefect.artifacts import create_link_artifact
import asyncio
import boto3
PAGE_SIZE = 1000
def format_logs_plaintext(logs):
logs.sort(key=lambda l: getattr(l, "timestamp", None))
lines = []
for l in logs:
ts = getattr(l, "timestamp", "")
lvl = getattr(getattr(l, "level", None), "name", str(getattr(l, "level", "")))
name = getattr(l, "name", "")
msg = getattr(l, "message", "")
lines.append(f"{ts} {lvl} {name} - {msg}")
return "\n".join(lines)
async def save_logs_to_s3_and_link(flow, flow_run, state,
bucket: str = "my-bucket",
key_prefix: str = "prefect/logs",
expires_in: int = 7 * 24 * 3600):
# 1) Fetch ALL logs (pagination)
async with get_client() as client:
offset = 0
all_logs = []
while True:
batch = await client.read_logs(
log_filter=LogFilter(flow_run_id=LogFilterFlowRunId(any_=[flow_run.id])),
limit=PAGE_SIZE,
offset=offset,
)
if not batch:
break
all_logs.extend(batch)
offset += len(batch)
# 2) Format plaintext
text = format_logs_plaintext(all_logs) or "<no logs found>"
# 3) Upload as text/plain with inline disposition
s3 = boto3.client("s3")
key = f"{key_prefix}/{flow_run.id}.txt"
s3.put_object(
Bucket=bucket,
Key=key,
Body=text.encode("utf-8"),
ContentType="text/plain",
ContentDisposition="inline",
)
# 4) Presign with explicit inline headers for browsers
url = s3.generate_presigned_url(
ClientMethod="get_object",
Params={
"Bucket": bucket, "Key": key,
"ResponseContentType": "text/plain",
"ResponseContentDisposition": "inline"
},
ExpiresIn=expires_in,
)
# 5) Create link artifact on the run
create_link_artifact(
link=url,
link_text="View full logs (text)",
key=f"{flow_run.id}-logs-link",
description="Full plaintext logs stored in S3"
)
@flow(
on_completion=lambda f, fr, s: asyncio.run(save_logs_to_s3_and_link(f, fr, s)),
on_failure=lambda f, fr, s: asyncio.run(save_logs_to_s3_and_link(f, fr, s)),
on_crashed=lambda f, fr, s: asyncio.run(save_logs_to_s3_and_link(f, fr, s)),
)
def my_flow():
# your flow here
...
GCS example (signed URL, inline view)
```
from datetime import timedelta
from prefect import flow
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import LogFilter, LogFilterFlowRunId
from prefect.artifacts import create_link_artifact
from google.cloud import storage
import asyncio
PAGE_SIZE = 1000
def format_logs_plaintext(logs):
logs.sort(key=lambda l: getattr(l, "timestamp", None))
return "\n".join(
f"{getattr(l,'timestamp','')} "
f"{getattr(getattr(l,'level',None),'name',str(getattr(l,'level','')))} "
f"{getattr(l,'name','')} - {getattr(l,'message','')}"
for l in logs
) or "<no logs found>"
async def save_logs_to_gcs_and_link(flow, flow_run, state,Marvin
09/23/2025, 12:04 AMAzure Blob example (SAS URL, inline view)
from datetime import datetime, timedelta
from prefect import flow
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import LogFilter, LogFilterFlowRunId
from prefect.artifacts import create_link_artifact
from azure.storage.blob import (
BlobServiceClient, ContentSettings,
generate_blob_sas, BlobSasPermissions
)
import asyncio
PAGE_SIZE = 1000
def format_logs_plaintext(logs):
logs.sort(key=lambda l: getattr(l, "timestamp", None))
return "\n".join(
f"{getattr(l,'timestamp','')} "
f"{getattr(getattr(l,'level',None),'name',str(getattr(l,'level','')))} "
f"{getattr(l,'name','')} - {getattr(l,'message','')}"
for l in logs
) or "<no logs found>"
async def save_logs_to_azure_and_link(flow, flow_run, state,
connection_string: str,
container: str = "logs",
blob_prefix: str = "prefect/logs",
expires_hours: int = 168, # 7 days
account_name: str = None,
account_key: str = None):
async with get_client() as client:
offset = 0
logs = []
while True:
batch = await client.read_logs(
log_filter=LogFilter(flow_run_id=LogFilterFlowRunId(any_=[flow_run.id])),
limit=PAGE_SIZE,
offset=offset,
)
if not batch:
break
logs.extend(batch)
offset += len(batch)
text = format_logs_plaintext(logs)
svc = BlobServiceClient.from_connection_string(connection_string)
blob_name = f"{blob_prefix}/{flow_run.id}.txt"
blob_client = svc.get_blob_client(container=container, blob=blob_name)
blob_client.upload_blob(
text.encode("utf-8"),
overwrite=True,
content_settings=ContentSettings(
content_type="text/plain",
content_disposition="inline",
),
)
# Generate SAS URL (requires account_name/account_key)
sas = generate_blob_sas(
account_name=account_name,
container_name=container,
blob_name=blob_name,
account_key=account_key,
permission=BlobSasPermissions(read=True),
expiry=datetime.utcnow() + timedelta(hours=expires_hours),
)Marvin
09/23/2025, 12:04 AM