Andi
09/05/2025, 10:32 PMMarvin
09/05/2025, 10:32 PMMarvin
09/05/2025, 10:34 PM@materialize
decorator to declare which asset(s) the task produces. You can also attach metadata (e.g., from the return value).
- You still write the actual asset to your store (S3, DB, etc.); Prefect tracks the asset key and its materializations.
Example:
from prefect import flow
from prefect.assets import Asset, materialize, add_asset_metadata
orders = Asset("<s3://my-bucket/dwh/orders.parquet>")
@materialize(orders, by="orders-writer")
def write_orders():
# ...write data to S3/DB...
rows_written = 1234
add_asset_metadata(orders, {"rows_written": rows_written})
return rows_written
@flow
def main():
write_orders()
if __name__ == "__main__":
main()
If you meant making the task’s return show up in the UI as a “rich thing” (artifact):
- Create an artifact from inside the task (e.g., markdown/table/image), which does not require a flow wrapper either.
from prefect import task, flow
from prefect.artifacts import create_table_artifact
@task
def summarize():
rows = [{"metric": "rows_written", "value": 1234}]
create_table_artifact(table=rows, description="Ingestion summary")
return rows
@flow
def main():
summarize()
Docs:
- Assets and materializations: Assets & materializations
- Artifacts: Artifacts
Quick check: Are you aiming for Prefect “Assets” (external resources tracked by key) or “Artifacts” (UI-rich attachments)? I can tailor the snippet to your exact use case.Andi
09/05/2025, 10:40 PMMarvin
09/05/2025, 10:40 PMMarvin
09/05/2025, 10:45 PM@materialize()
(you don’t have to list assets up front).
- Inside the task, compute the key and call add_asset_metadata(key, {...})
. Doing so both registers that asset for this task run and attaches metadata; when the task succeeds, Prefect emits a materialization event for each registered key.
Example (single dynamic key):
from prefect import flow
from prefect.assets import materialize, add_asset_metadata
@materialize(by="writer-task")
def write_partition(dt: str) -> str:
# Compute the asset key at runtime (must be a URI-like string)
key = f"<s3://my-bucket/dt={dt}/data.parquet>"
# ... write data to your store ...
# Register the asset + attach metadata; this marks it for materialization on success
add_asset_metadata(key, {"rows": 1234, "dt": dt})
return key
@flow
def main():
write_partition("2025-09-05")
Multiple dynamic assets:
@materialize(by="bulk-writer")
def write_many(parts: list[str]) -> list[str]:
keys = []
for p in parts:
key = f"<s3://my-bucket/part={p}/data.parquet>"
# ... write ...
add_asset_metadata(key, {"part": p})
keys.append(key)
return keys
Notes:
- Asset keys must be valid URIs (e.g., s3://...
, gs://...
, snowflake://...
).
- If you need to represent upstream asset dependencies in the graph, you can set asset_deps
on the task (or pass assets to @materialize
for the known ones) and still add more keys dynamically with add_asset_metadata
.
Docs:
- Assets & materializations overview: Assets & materializations
If you can share what your dynamic keys look like (e.g., partitioned S3 paths, table names, etc.), I can tailor the snippet and URI formatting.Andi
09/05/2025, 11:30 PMMarvin
09/05/2025, 11:30 PMMarvin
09/05/2025, 11:31 PMadd_asset_metadata(...)
does not create a new materialization by itself; it only attaches metadata to assets that the current task is already materializing via @materialize(...)
. If no asset was declared for the task, calling add_asset_metadata
won’t create one.
- If you need to materialize a key that is computed at runtime from the task’s return value, do it inside the same task run, either by:
- Using @materialize(...)
for known assets and attaching metadata, or
- Emitting an asset materialization event imperatively.
Imperative option (works with fully dynamic keys):
from prefect import task, flow
from prefect.events import emit_event
from prefect.events.related import related_resources_from_run_context
@task
def write_and_materialize(part: str):
key = f"<s3://my-bucket/part={part}/data.parquet>"
# ...write your data...
# Imperatively emit an asset materialization event for the dynamic key
emit_event(
event="prefect.asset.materialization.succeeded",
resource={"prefect.resource.id": f"prefect.asset.{key}"},
related=related_resources_from_run_context(), # links to the current run
payload={"part": part, "rows": 1234},
)
return key
@flow
def main():
write_and_materialize("2025-09-05")
Notes:
- The event must be emitted during a running task/flow to be linked in the UI. You can emit it from a follow-up task if needed, but it won’t attach to the already-finished earlier task run.
- Asset keys must be URI-like (e.g., s3://...
, gs://...
, snowflake://...
).
If you can share the snippet you tried, I can help adapt it so the materialization is recorded for your dynamic key.