<@ULVA73B9P> Is there a way to display logs as pla...
# ask-marvin
j
@Marvin Is there a way to display logs as plaintext in the prefect server UI?
m
Error: GitHub API error 422: {"message":"Validation Failed","errors":[{"message":"The listed users and repositories cannot be searched either because the resources do not exist or you do not have permission to view them.","resource":"Search","field":"q","code":"invalid"}],"documentation_url":"https://docs.github.com/v3/search/","status":"422"}
Sorry, I encountered an error while processing your request. Please try again.
j
@Marvin Can you try again?
m
thought for 8.8 seconds
Happy to! What would you like me to try again? If it’s about a Prefect task/flow/deployment, please share: - What you’re trying to do and what went wrong - Prefect version (I’ll assume 3.x unless you say otherwise) - Code snippet or minimal repro (use ``` for code) - Any CLI commands you ran and their output - How/where you’re running (local, Prefect Cloud, self-hosted) - Worker type + work pool, and infra (Process, Docker, Kubernetes) - Relevant logs, error messages, or a run ID/link If you want me to re-check a CLI command or produce a working example, tell me the exact command or scenario and I’ll verify and provide the corrected steps.
j
@Marvin Is there a way to display logs as plaintext in the prefect server UI?
m
thought for 225.9 seconds
Short answer: not in the UI today. In Prefect 3.x there isn’t a toggle to render logs as raw/plain text in the Server (or Cloud) UI. Workarounds: - CLI (plain text): we verified these commands exist - Flow runs:
Copy code
prefect flow-run logs <FLOW_RUN_ID>
    prefect flow-run logs <FLOW_RUN_ID> --tail -n 200 > flow_logs.txt
- Task runs:
Copy code
prefect task-run logs <TASK_RUN_ID>
    prefect task-run logs <TASK_RUN_ID> --reverse --head -n 100 > task_logs.txt
- Python client (export as plain text however you like):
Copy code
import asyncio
  from prefect.client.orchestration import get_client
  from prefect.client.schemas.filters import LogFilter, LogFilterFlowRunId

  async def fetch_logs(flow_run_id: str):
    async with get_client() as client:
      logs = await client.read_logs(
        log_filter=LogFilter(flow_run_id=LogFilterFlowRunId(any_=[flow_run_id]))
      )
      for l in logs:
        # format to your liking
        print(f"{l.timestamp} {l.level.name} {l.name} - {l.message}")

  asyncio.run(fetch_logs("<FLOW_RUN_ID>"))
- Make your logs “look” plain in the UI by using a simple formatter in your logging config. This won’t change the UI’s layout, but your messages will be plain. See logging customization. References: - Add logging to flows/tasks - Logging customization - Logs API reference If your goal is easy copy/paste or bulk export from the UI, tell me your use case—I can suggest a tailored script or we can open a feature request for a “Download logs” / “Plain text” view.
j
@Marvin The logs will be examined for any errors that might have occurred. The UI creates a lot of empty whitespace, and doesn't load the entire log immediately, requiring scrolling. Is there a way I can create the plaintext of the entire log as an artifact for that run?
m
thought for 247.4 seconds
Yes—great idea. You can generate a single plaintext “full log” artifact for each run. The simplest way is to add a flow hook that, on completion or failure, fetches all logs for the current flow run, formats them as plain text, and publishes a Markdown artifact (wrapped in a code block so whitespace is preserved). Here’s a drop-in example for Prefect 3.x:
Copy code
from prefect import flow
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import LogFilter, LogFilterFlowRunId
from prefect.artifacts import create_markdown_artifact

async def logs_to_artifact(flow, flow_run, state):
    # Fetch all logs for this flow run
    async with get_client() as client:
        logs = await client.read_logs(
            log_filter=LogFilter(flow_run_id=LogFilterFlowRunId(any_=[flow_run.id]))
        )

    # Sort and render as plain text
    logs = sorted(logs, key=lambda l: l.timestamp)
    lines = []
    for l in logs:
        # Some fields vary by environment, so be defensive
        ts = getattr(l, "timestamp", "")
        lvl = getattr(l, "level", "")
        name = getattr(l, "name", "")
        msg = getattr(l, "message", "")
        lines.append(f"{ts} {lvl} {name} - {msg}")

    text = "\n".join(lines)

    # Wrap in a code fence to keep it as plain text in the UI
    create_markdown_artifact(
        markdown=f"```\n{text}\n```",
        key=f"{flow_run.id}-logs",
        description="Plaintext logs for this flow run"
    )

@flow(on_completion=logs_to_artifact, on_failure=logs_to_artifact)
def my_flow():
    # your flow code here
    pass
Notes and options: - This attaches the artifact directly to the run (because it’s created inside the run via a hook). - If your logs can be very large, consider: - Chunking into multiple artifacts (e.g., create one artifact per N characters) - Uploading the text to object storage (S3/GCS) and creating a link artifact pointing to it:
Copy code
from prefect.artifacts import create_link_artifact
    create_link_artifact(link="<presigned-url-or-static-url>", link_text="Download full logs", key=f"{flow_run.id}-logs")
- The filter by
flow_run_id
includes both flow- and task-run logs for that run. Docs: - Artifacts API (markdown/link/table) If you want, I can adapt this to: - Only include ERROR/CRITICAL logs - Include task-run IDs or other fields - Store in S3/GCS and link instead of embedding in a Markdown artifact Tell me your expected log sizes and storage preferences, and I’ll tailor the example.
j
@Marvin I tried the sample code you gave me. It worked with some small edits, but it unfortunately did not retrieve the entire log. Also, my expect log sizes are around 10 MB.
m
thought for 267.1 seconds
Thanks for the details. What you’re seeing is pagination:
read_logs
returns logs in pages, so my quick example only got the first page. For ~10 MB logs, I recommend one of these patterns: A) Chunked Markdown artifacts (keeps everything in Prefect) - Creates multiple artifacts of plain text, each a manageable size - Easy to click-through in the run UI
Copy code
from prefect import flow
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import LogFilter, LogFilterFlowRunId
from prefect.artifacts import create_markdown_artifact

PAGE_SIZE = 1000          # logs per API call
CHUNK_CHARS = 500_000     # ~0.5MB per artifact block

async def create_full_log_artifacts(flow, flow_run, state):
    # Fetch all pages
    async with get_client() as client:
        offset = 0
        all_logs = []
        while True:
            batch = await client.read_logs(
                log_filter=LogFilter(flow_run_id=LogFilterFlowRunId(any_=[flow_run.id])),
                limit=PAGE_SIZE,
                offset=offset,
            )
            if not batch:
                break
            all_logs.extend(batch)
            offset += len(batch)

    # Sort by timestamp and render as plain text
    all_logs.sort(key=lambda l: getattr(l, "timestamp", None))
    lines = []
    for l in all_logs:
        ts = getattr(l, "timestamp", "")
        lvl = getattr(l, "level", "")
        name = getattr(l, "name", "")
        msg = getattr(l, "message", "")
        lines.append(f"{ts} {lvl} {name} - {msg}")
    text = "\n".join(lines)

    # Chunk into multiple Markdown artifacts so the UI can load them
    for i in range(0, len(text), CHUNK_CHARS):
        chunk = text[i:i+CHUNK_CHARS]
        create_markdown_artifact(
            markdown=f"```\n{chunk}\n```",
            key=f"{flow_run.id}-logs-{i//CHUNK_CHARS+1}",
            description="Plaintext logs (chunked)"
        )

@flow(on_completion=create_full_log_artifacts, on_failure=create_full_log_artifacts, on_crashed=create_full_log_artifacts)
def my_flow():
    # your flow here
    ...
B) Upload to object storage + link artifact (recommended for very large logs) - Fast to view/download; avoids heavy UI rendering of huge Markdown - Example below uses S3 with gzip ``` from prefect import flow from prefect.client.orchestration import get_client from prefect.client.schemas.filters import LogFilter, LogFilterFlowRunId from prefect.artifacts import create_link_artifact import io, gzip, asyncio, boto3 PAGE_SIZE = 1000 async def create_log_link_artifact(flow, flow_run, state, bucket: str, key_prefix: str): # Fetch all logs (pagination) async with get_client() as client: offset = 0 logs = [] while True: batch = await client.read_logs( log_filter=LogFilter(flow_run_id=LogFilterFlowRunId(any_=[flow_run.id])), limit=PAGE_SIZE, offset=offset, ) if not batch: break logs.extend(batch) offset += len(batch) # Format plaintext logs.sort(key=lambda l: getattr(l, "timestamp", None)) text = "\n".join( f"{getattr(l,'timestamp','')} {getattr(l,'level','')} {getattr(l,'name','')} - {getattr(l,'message','')}" for l in logs ) # Gzip and upload s3 = boto3.client("s3") key = f"{key_prefix}/{flow_run.id}.log.gz" buf = io.BytesIO() with gzip.GzipFile(fileobj=buf, mode="wb") as f: f.write(text.encode("utf-8")) buf.seek(0) s3.upload_fileobj(buf, bucket, key, ExtraArgs={"ContentType": "text/plain", "ContentEncoding": "gzip"}) # Presigned URL and artifact url = s3.generate_presigned_url( ClientMethod="get_object", Params={"Bucket": bucket, "Key": key}, ExpiresIn=7 * 24 * 3600, # 7 days ) create_link_artifact( link=url, link_text="Download full logs (gzip)", key=f"{flow_run.id}-logs",
description="Full plaintext logs stored in S3 (gzipped)" ) @flow(on_completion=lambda f, fr, s: asyncio.run(create_log_link_artifact(f, fr, s, "my-bucket", "prefect/logs")), on_failure=lambda f, fr, s: asyncio.run(create_log_link_artifact(f, fr, s, "my-bucket", "prefect/logs")), on_crashed=lambda f, fr, s: asyncio.run(create_log_link_artifact(f, fr, s, "my-bucket", "prefect/logs"))) def my_flow(): ... ``` Why this helps: - Paginates through all logs so nothing is missed - Produces either chunked Markdown artifacts or a single downloadable file - Keeps whitespace intact by using a code fence for Markdown References: - Artifacts API - Logs models/API Questions to tailor this for you: - Are you on Cloud or self-hosted Server? - Do you prefer chunked artifacts in the UI, or a single download link? - If a link, which storage do you use (S3, GCS, Azure)? I’ll adjust the snippet accordingly.
j
@Marvin I would prefer a solution where I can view the entire source without having to download. Is it possible to store the files in some store and then have them be viewable through Prefect Server UI?
m
thought for 69.8 seconds
Short answer: To view inline in the Prefect Server UI, you need to use artifacts. The UI can’t render files from external storage inline (link artifacts open externally). For large logs, the best in-UI approach is to create Markdown artifacts that contain the logs as plain text (in code fences). For ~10 MB logs, create them in chunks so the page stays responsive and nothing is truncated. Here’s a robust, paginated hook that fetches ALL logs and emits chunked plaintext artifacts:
Copy code
from prefect import flow
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import LogFilter, LogFilterFlowRunId
from prefect.artifacts import create_markdown_artifact

PAGE_SIZE = 1000         # logs per API call; adjust if needed
CHUNK_CHARS = 1_000_000  # ~1MB per artifact chunk; adjust for your UI performance

async def full_log_to_chunked_artifacts(flow, flow_run, state):
    # 1) Page through all logs for this run
    async with get_client() as client:
        offset = 0
        all_logs = []
        while True:
            batch = await client.read_logs(
                log_filter=LogFilter(flow_run_id=LogFilterFlowRunId(any_=[flow_run.id])),
                limit=PAGE_SIZE,
                offset=offset,
            )
            if not batch:
                break
            all_logs.extend(batch)
            offset += len(batch)

    # 2) Sort and format as plain text
    all_logs.sort(key=lambda l: getattr(l, "timestamp", None))
    lines = []
    for l in all_logs:
        ts = getattr(l, "timestamp", "")
        lvl = str(getattr(l, "level", ""))
        name = getattr(l, "name", "")
        msg = getattr(l, "message", "")
        lines.append(f"{ts} {lvl} {name} - {msg}")
    text = "\n".join(lines)

    # 3) Emit chunked Markdown artifacts (rendered inline in the UI)
    if not text:
        create_markdown_artifact(
            markdown="```\n<no logs found>\n```",
            key=f"{flow_run.id}-logs-1",
            description="Plaintext logs (chunk 1)"
        )
        return

    for i in range(0, len(text), CHUNK_CHARS):
        chunk = text[i:i+CHUNK_CHARS]
        chunk_num = (i // CHUNK_CHARS) + 1
        create_markdown_artifact(
            markdown=f"```\n{chunk}\n```",
            key=f"{flow_run.id}-logs-{chunk_num}",
            description=f"Plaintext logs (chunk {chunk_num})"
        )

@flow(
    on_completion=full_log_to_chunked_artifacts,
    on_failure=full_log_to_chunked_artifacts,
    on_crashed=full_log_to_chunked_artifacts,
)
def my_flow():
    # your flow here
    ...
Why this works for your needs: - It paginates through the entire log set so nothing is missed. - It renders as plain text in the Prefect UI (no download needed). - Chunking avoids UI slowdowns with very large single artifacts. With ~10 MB logs and 1 MB chunks, you’ll see about 10 artifacts on the run page you can click through quickly. If you truly need a single on-page artifact, you can set CHUNK_CHARS to a very large value to force one artifact. Just note very large Markdown payloads can render slowly or be clipped by browser/DB constraints; chunking is more reliable. References: - Artifacts API - Logs API/models If you want, I can: - Add a top-level “index” artifact that lists and links to each chunk - Filter to only ERROR/CRITICAL lines or include task run IDs - Tune the page size/chunk size for your environment (Server vs Cloud)
j
@Marvin I would like to see the example with an added "index" artifact
m
thought for 66.3 seconds
Absolutely — here’s an example that creates: - Chunked plaintext log artifacts (rendered inline in the UI) - A separate “index” artifact that lists each chunk with its key, artifact ID, and size It paginates through all logs so nothing is missed and builds a readable index you can use from the run page’s Artifacts panel.
Copy code
from prefect import flow
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import LogFilter, LogFilterFlowRunId
from prefect.artifacts import create_markdown_artifact
from prefect.runtime import flow_run as rt_flow_run

PAGE_SIZE   = 1000        # logs per API call
CHUNK_CHARS = 1_000_000   # ~1 MB of text per artifact chunk; adjust as needed

def _fmt_level(lvl):
    # lvl may be an enum or string; normalize to a string
    if hasattr(lvl, "name"):
        return lvl.name
    return "" if lvl is None else str(lvl)

async def full_log_to_chunked_artifacts_with_index(flow, flow_run, state):
    # 1) Fetch ALL logs with pagination
    async with get_client() as client:
        offset = 0
        all_logs = []
        while True:
            batch = await client.read_logs(
                log_filter=LogFilter(flow_run_id=LogFilterFlowRunId(any_=[flow_run.id])),
                limit=PAGE_SIZE,
                offset=offset,
            )
            if not batch:
                break
            all_logs.extend(batch)
            offset += len(batch)

    # 2) Sort and format as plaintext
    all_logs.sort(key=lambda l: getattr(l, "timestamp", None))
    lines = []
    for l in all_logs:
        ts = getattr(l, "timestamp", "")
        lvl = _fmt_level(getattr(l, "level", None))
        name = getattr(l, "name", "")
        msg = getattr(l, "message", "")
        lines.append(f"{ts} {lvl} {name} - {msg}")
    text = "\n".join(lines)

    # 3) Create chunked Markdown artifacts so they render inline in the UI
    chunk_meta = []  # collect metadata for the index: [{'num': 1, 'key': '...', 'id': UUID, 'size': 12345}, ...]
    if not text:
        art_id = create_markdown_artifact(
            markdown="```\n<no logs found>\n```",
            key=f"{flow_run.id}-logs-1",
            description="Plaintext logs (chunk 1)",
        )
        chunk_meta.append({"num": 1, "key": f"{flow_run.id}-logs-1", "id": str(art_id), "size": 0})
    else:
        chunk_num = 0
        for i in range(0, len(text), CHUNK_CHARS):
            chunk_num += 1
            chunk = text[i:i+CHUNK_CHARS]
            key = f"{flow_run.id}-logs-{chunk_num}"
            art_id = create_markdown_artifact(
                markdown=f"```\n{chunk}\n```",
                key=key,
                description=f"Plaintext logs (chunk {chunk_num})",
            )
            chunk_meta.append({"num": chunk_num, "key": key, "id": str(art_id), "size": len(chunk)})

    # 4) Create an index artifact that summarizes all chunks
    run_url = ""
    try:
        run_url = rt_flow_run.ui_url() or ""
    except Exception:
        pass

    index_lines = []
    index_lines.append(f"# Log index for flow run {flow_run.id}")
    if run_url:
        index_lines.append(f"- Run UI: {run_url}")
    index_lines.append("")
    index_lines.append("The following chunks are available in the Artifacts panel for this run:")
    index_lines.append("")
    index_lines.append("| Chunk | Key | Artifact ID | Size (chars) |")
    index_lines.append("| --- | --- | --- | --- |")
    for m in chunk_meta:
        index_lines.append(f"| {m['num']} | `{m['key']}` | `{m['id']}` | {m['size']} |")

    create_markdown_artifact(
        markdown="\n".join(index_lines),
        key=f"{flow_run.id}-logs-index",
        description="Index for chunked plaintext logs",
    )

@flow(
    on_completion=full_log_to_chunked_artifacts_with_index,
    on_failure=full_log_to_chunked_artifacts_with_index,
    on_crashed=full_log_to_chunked_artifacts_with_index,
)
def my_flow():
    # your flow here
    ...
How to use it
- The “index” artifact will show a table of all chunks with their keys and IDs. - The chunk artifacts are rendered inline as plain text (no download). - Open your flow run in the Prefect UI and use the Artifacts panel; you’ll see the index plus each chunk listed by key. Notes - Increase CHUNK_CHARS if you prefer fewer larger chunks; decrease if the UI feels sluggish. - You can filter logs (e.g., only ERROR/CRITICAL) by adding a LogFilterLevel to the LogFilter. - If you later discover stable deep-links to specific artifacts in your Server UI build, you can update the index to include clickable links; Prefect’s UI does not expose a guaranteed per-artifact URL today. Docs - Artifacts API - Logs API/models
j
@Marvin Can I get an example of having the log saved in an external store, and a link artifact to the log?
m
thought for 754.6 seconds
Yes—here are end-to-end examples that: - Fetch ALL logs for the run (with pagination) - Format them as plaintext - Save to external storage - Create a link artifact in Prefect so you can open the log in the browser Notes: - To render inline in a browser (instead of download), set Content-Type to text/plain and Content-Disposition to inline when uploading or when generating the signed URL. - Prefect’s link artifacts open externally; rendering depends on your storage headers. S3 example (presigned URL, inline view)
Copy code
from datetime import timedelta
from prefect import flow
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import LogFilter, LogFilterFlowRunId
from prefect.artifacts import create_link_artifact
import asyncio
import boto3

PAGE_SIZE = 1000

def format_logs_plaintext(logs):
    logs.sort(key=lambda l: getattr(l, "timestamp", None))
    lines = []
    for l in logs:
        ts = getattr(l, "timestamp", "")
        lvl = getattr(getattr(l, "level", None), "name", str(getattr(l, "level", "")))
        name = getattr(l, "name", "")
        msg = getattr(l, "message", "")
        lines.append(f"{ts} {lvl} {name} - {msg}")
    return "\n".join(lines)

async def save_logs_to_s3_and_link(flow, flow_run, state,
                                   bucket: str = "my-bucket",
                                   key_prefix: str = "prefect/logs",
                                   expires_in: int = 7 * 24 * 3600):
    # 1) Fetch ALL logs (pagination)
    async with get_client() as client:
        offset = 0
        all_logs = []
        while True:
            batch = await client.read_logs(
                log_filter=LogFilter(flow_run_id=LogFilterFlowRunId(any_=[flow_run.id])),
                limit=PAGE_SIZE,
                offset=offset,
            )
            if not batch:
                break
            all_logs.extend(batch)
            offset += len(batch)

    # 2) Format plaintext
    text = format_logs_plaintext(all_logs) or "<no logs found>"

    # 3) Upload as text/plain with inline disposition
    s3 = boto3.client("s3")
    key = f"{key_prefix}/{flow_run.id}.txt"
    s3.put_object(
        Bucket=bucket,
        Key=key,
        Body=text.encode("utf-8"),
        ContentType="text/plain",
        ContentDisposition="inline",
    )

    # 4) Presign with explicit inline headers for browsers
    url = s3.generate_presigned_url(
        ClientMethod="get_object",
        Params={
            "Bucket": bucket, "Key": key,
            "ResponseContentType": "text/plain",
            "ResponseContentDisposition": "inline"
        },
        ExpiresIn=expires_in,
    )

    # 5) Create link artifact on the run
    create_link_artifact(
        link=url,
        link_text="View full logs (text)",
        key=f"{flow_run.id}-logs-link",
        description="Full plaintext logs stored in S3"
    )

@flow(
    on_completion=lambda f, fr, s: asyncio.run(save_logs_to_s3_and_link(f, fr, s)),
    on_failure=lambda f, fr, s: asyncio.run(save_logs_to_s3_and_link(f, fr, s)),
    on_crashed=lambda f, fr, s: asyncio.run(save_logs_to_s3_and_link(f, fr, s)),
)
def my_flow():
    # your flow here
    ...
GCS example (signed URL, inline view) ``` from datetime import timedelta from prefect import flow from prefect.client.orchestration import get_client from prefect.client.schemas.filters import LogFilter, LogFilterFlowRunId from prefect.artifacts import create_link_artifact from google.cloud import storage import asyncio PAGE_SIZE = 1000 def format_logs_plaintext(logs): logs.sort(key=lambda l: getattr(l, "timestamp", None)) return "\n".join( f"{getattr(l,'timestamp','')} " f"{getattr(getattr(l,'level',None),'name',str(getattr(l,'level','')))} " f"{getattr(l,'name','')} - {getattr(l,'message','')}" for l in logs ) or "<no logs found>" async def save_logs_to_gcs_and_link(flow, flow_run, state,
bucket_name: str = "my-bucket", blob_prefix: str = "prefect/logs", expires_in_seconds: int = 7 * 24 * 3600): async with get_client() as client: offset = 0 logs = [] while True: batch = await client.read_logs( log_filter=LogFilter(flow_run_id=LogFilterFlowRunId(any_=[flow_run.id])), limit=PAGE_SIZE, offset=offset, ) if not batch: break logs.extend(batch) offset += len(batch) text = format_logs_plaintext(logs) storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob_name = f"{blob_prefix}/{flow_run.id}.txt" blob = bucket.blob(blob_name) blob.upload_from_string(text, content_type="text/plain") url = blob.generate_signed_url( expiration=timedelta(seconds=expires_in_seconds), response_disposition="inline", ) create_link_artifact( link=url, link_text="View full logs (text)", key=f"{flow_run.id}-logs-link", description="Full plaintext logs stored in GCS" ) @flow( on_completion=lambda f, fr, s: asyncio.run(save_logs_to_gcs_and_link(f, fr, s)), on_failure=lambda f, fr, s: asyncio.run(save_logs_to_gcs_and_link(f, fr, s)), on_crashed=lambda f, fr, s: asyncio.run(save_logs_to_gcs_and_link(f, fr, s)), ) def my_flow(): ...
Copy code
Azure Blob example (SAS URL, inline view)
from datetime import datetime, timedelta from prefect import flow from prefect.client.orchestration import get_client from prefect.client.schemas.filters import LogFilter, LogFilterFlowRunId from prefect.artifacts import create_link_artifact from azure.storage.blob import ( BlobServiceClient, ContentSettings, generate_blob_sas, BlobSasPermissions ) import asyncio PAGE_SIZE = 1000 def format_logs_plaintext(logs): logs.sort(key=lambda l: getattr(l, "timestamp", None)) return "\n".join( f"{getattr(l,'timestamp','')} " f"{getattr(getattr(l,'level',None),'name',str(getattr(l,'level','')))} " f"{getattr(l,'name','')} - {getattr(l,'message','')}" for l in logs ) or "<no logs found>" async def save_logs_to_azure_and_link(flow, flow_run, state, connection_string: str, container: str = "logs", blob_prefix: str = "prefect/logs", expires_hours: int = 168, # 7 days account_name: str = None, account_key: str = None): async with get_client() as client: offset = 0 logs = [] while True: batch = await client.read_logs( log_filter=LogFilter(flow_run_id=LogFilterFlowRunId(any_=[flow_run.id])), limit=PAGE_SIZE, offset=offset, ) if not batch: break logs.extend(batch) offset += len(batch) text = format_logs_plaintext(logs) svc = BlobServiceClient.from_connection_string(connection_string) blob_name = f"{blob_prefix}/{flow_run.id}.txt" blob_client = svc.get_blob_client(container=container, blob=blob_name) blob_client.upload_blob( text.encode("utf-8"), overwrite=True, content_settings=ContentSettings( content_type="text/plain", content_disposition="inline", ), ) # Generate SAS URL (requires account_name/account_key) sas = generate_blob_sas( account_name=account_name, container_name=container, blob_name=blob_name, account_key=account_key, permission=BlobSasPermissions(read=True), expiry=datetime.utcnow() + timedelta(hours=expires_hours), )
url = f"https://{account_name}.blob.core.windows.net/{container}/{blob_name}?{sas}" create_link_artifact( link=url, link_text="View full logs (text)", key=f"{flow_run.id}-logs-link", description="Full plaintext logs stored in Azure Blob" ) @flow def my_flow(): ... # Example hook binding: # on_completion=lambda f, fr, s: asyncio.run(save_logs_to_azure_and_link(f, fr, s, connection_string=..., account_name=..., account_key=...)) ``` Tips: - For very large logs, gzip to save storage/bandwidth, but most browsers will download a .gz file instead of displaying inline. If inline view is your priority, keep as text/plain without compression. - If your storage is public-read or behind a CDN, you can skip signed URLs and link directly to the public object. - If you want to keep an index artifact in Prefect that links to both the external file and (optionally) chunked inline artifacts, we can combine this with the earlier “index” pattern. Docs: - Artifacts API: prefect.artifacts - Logs API/models: prefect.server.models.logs If you tell me your exact storage (S3/GCS/Azure) and auth setup, I can tailor the snippet with best practices (e.g., bucket policies, cache headers, or CDN-friendly URLs).