Adam Lear
09/03/2025, 1:57 PMMarvin
09/03/2025, 1:57 PMMarvin
09/03/2025, 2:03 PMfrom prefect import flow
from prefect.artifacts import create_markdown_artifact
import json
@flow
def my_flow():
# Whatever you want to persist
metadata = {
"rows_loaded": 12345,
"output_uri": "<s3://my-bucket/data/run_2025_09_03.parquet>",
"notes": "Backfilled partition=2025-09-03"
}
create_markdown_artifact(
markdown="Run metadata:\n```json\n" + json.dumps(metadata, indent=2) + "\n```",
key="my_flow/run-metadata",
description="Per-run metadata for my_flow"
)
Example: attach a link to an external artifact (e.g., S3/GCS/DBT docs)
from prefect.artifacts import create_link_artifact
create_link_artifact(
link="<s3://my-bucket/data/run_2025_09_03.parquet>",
link_text="Output dataset",
key="my_flow/output"
)
Example: attach a small table/dict
from prefect.artifacts import create_table_artifact
create_table_artifact(
table=[{"table": "events", "rows_loaded": 12345, "status": "ok"}],
key="my_flow/load-stats",
description="Load stats per run"
)
Retrieving artifacts later
- CLI
prefect artifact ls
prefect artifact inspect "my_flow/run-metadata"
- Python API (filter by key or by flow_run_id)
import asyncio
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import ArtifactFilter, ArtifactFilterKey, ArtifactFilterFlowRunId
async def fetch_latest_by_key():
async with get_client() as client:
latest = await client.read_latest_artifacts(artifact_keys=["my_flow/run-metadata"])
for a in latest:
print(a.key, a.type, a.data)
async def fetch_all_for_run(run_id: str):
async with get_client() as client:
arts = await client.read_artifacts(
artifact_filter=ArtifactFilter(
flow_run_id=ArtifactFilterFlowRunId(any_=[run_id])
)
)
for a in arts:
print(a.key, a.type, a.data)
asyncio.run(fetch_latest_by_key())
Alternatives (use sparingly)
- Update flow run attributes via the API (e.g., name, tags) if you only need small labels/strings:
from prefect.runtime import flow_run
from prefect.client.orchestration import get_client
from prefect.client.schemas.actions import FlowRunUpdate
import asyncio
async def tag_run():
async with get_client() as client:
await client.update_flow_run(
flow_run_id=flow_run.id,
flow_run=FlowRunUpdate(tags=["output:s3", "backfill"])
)
asyncio.run(tag_run())
Note: tags are great for filtering/searching, but not for storing rich data. There is no generic “metadata” field on flow runs in 3.x, and mutating parameters mid-run is generally discouraged.
Docs and references
- Artifacts CLI: Artifacts CLI (prefect artifact)
- API Client: Using the Prefect API client
- Flow Runs API reference: Flow runs API
Would you like help picking the right artifact type for your data (e.g., a JSON blob vs. a link), and how you plan to retrieve it (UI, API, or CLI)?