<@ULVA73B9P> is there a way to make task return va...
# ask-marvin
a
@Marvin is there a way to make task return value as an asset/material without writing a custom flow?
m
thought for 120.1 seconds
Short answer: yes—with Prefect 3.x you can mark a task as materializing an asset without writing a custom flow wrapper. If you mean Prefect “Assets” (tracked via materialization): - Use the
@materialize
decorator to declare which asset(s) the task produces. You can also attach metadata (e.g., from the return value). - You still write the actual asset to your store (S3, DB, etc.); Prefect tracks the asset key and its materializations. Example:
Copy code
from prefect import flow
from prefect.assets import Asset, materialize, add_asset_metadata

orders = Asset("<s3://my-bucket/dwh/orders.parquet>")

@materialize(orders, by="orders-writer")
def write_orders():
    # ...write data to S3/DB...
    rows_written = 1234
    add_asset_metadata(orders, {"rows_written": rows_written})
    return rows_written

@flow
def main():
    write_orders()

if __name__ == "__main__":
    main()
If you meant making the task’s return show up in the UI as a “rich thing” (artifact): - Create an artifact from inside the task (e.g., markdown/table/image), which does not require a flow wrapper either.
Copy code
from prefect import task, flow
from prefect.artifacts import create_table_artifact

@task
def summarize():
    rows = [{"metric": "rows_written", "value": 1234}]
    create_table_artifact(table=rows, description="Ingestion summary")
    return rows

@flow
def main():
    summarize()
Docs: - Assets and materializations: Assets &amp; materializations - Artifacts: Artifacts Quick check: Are you aiming for Prefect “Assets” (external resources tracked by key) or “Artifacts” (UI-rich attachments)? I can tailor the snippet to your exact use case.
a
@Marvin No, I mean dynamically materialize an asset based on the return value of a task (return value as the asset key). Your example shows how to add metadata, but what I need is to add the asset itself
m
thought for 277.3 seconds
Yes—you can dynamically materialize assets where the key is computed at runtime, without a custom flow wrapper. Pattern: - Decorate the task with
@materialize()
(you don’t have to list assets up front). - Inside the task, compute the key and call
add_asset_metadata(key, {...})
. Doing so both registers that asset for this task run and attaches metadata; when the task succeeds, Prefect emits a materialization event for each registered key. Example (single dynamic key):
Copy code
from prefect import flow
from prefect.assets import materialize, add_asset_metadata

@materialize(by="writer-task")
def write_partition(dt: str) -> str:
    # Compute the asset key at runtime (must be a URI-like string)
    key = f"<s3://my-bucket/dt={dt}/data.parquet>"

    # ... write data to your store ...

    # Register the asset + attach metadata; this marks it for materialization on success
    add_asset_metadata(key, {"rows": 1234, "dt": dt})

    return key

@flow
def main():
    write_partition("2025-09-05")
Multiple dynamic assets:
Copy code
@materialize(by="bulk-writer")
def write_many(parts: list[str]) -> list[str]:
    keys = []
    for p in parts:
        key = f"<s3://my-bucket/part={p}/data.parquet>"
        # ... write ...
        add_asset_metadata(key, {"part": p})
        keys.append(key)
    return keys
Notes: - Asset keys must be valid URIs (e.g.,
s3://...
,
gs://...
,
snowflake://...
). - If you need to represent upstream asset dependencies in the graph, you can set
asset_deps
on the task (or pass assets to
@materialize
for the known ones) and still add more keys dynamically with
add_asset_metadata
. Docs: - Assets & materializations overview: Assets &amp; materializations If you can share what your dynamic keys look like (e.g., partitioned S3 paths, table names, etc.), I can tailor the snippet and URI formatting.
a
@Marvin That doesn't seem to work, how about adding asset to an already finished task, is it possible?
m
thought for 57.1 seconds
Short answer: no — you can’t “add an asset to” a task that has already finished. Asset materialization events are emitted during the task’s run. After the task completes, you can’t retroactively attach a new asset materialization to that past task run. A couple clarifications/workarounds: -
add_asset_metadata(...)
does not create a new materialization by itself; it only attaches metadata to assets that the current task is already materializing via
@materialize(...)
. If no asset was declared for the task, calling
add_asset_metadata
won’t create one. - If you need to materialize a key that is computed at runtime from the task’s return value, do it inside the same task run, either by: - Using
@materialize(...)
for known assets and attaching metadata, or - Emitting an asset materialization event imperatively. Imperative option (works with fully dynamic keys):
Copy code
from prefect import task, flow
from prefect.events import emit_event
from prefect.events.related import related_resources_from_run_context

@task
def write_and_materialize(part: str):
    key = f"<s3://my-bucket/part={part}/data.parquet>"

    # ...write your data...

    # Imperatively emit an asset materialization event for the dynamic key
    emit_event(
        event="prefect.asset.materialization.succeeded",
        resource={"prefect.resource.id": f"prefect.asset.{key}"},
        related=related_resources_from_run_context(),  # links to the current run
        payload={"part": part, "rows": 1234},
    )
    return key

@flow
def main():
    write_and_materialize("2025-09-05")
Notes: - The event must be emitted during a running task/flow to be linked in the UI. You can emit it from a follow-up task if needed, but it won’t attach to the already-finished earlier task run. - Asset keys must be URI-like (e.g.,
s3://...
,
gs://...
,
snowflake://...
). If you can share the snippet you tried, I can help adapt it so the materialization is recorded for your dynamic key.