<@ULVA73B9P> i am using prefect 3.4.10 with the fo...
# ask-marvin
d
@Marvin i am using prefect 3.4.10 with the following high-level architecture pattern:
Copy code
@task
def task_handler(task_path: str, task_params: dict, input_data: Any) -> Any:
    """
    Dynamically imports and executes a single task module.
    """
    task_class = import_class(task_path)
    task_instance = task_class()
    if input_data is not None:
        return task_instance.run(input_data, **task_params)
    else:
        return task_instance.run(**task_params)

@flow
def workflow_executor(task_sequence: List[Dict[str, Any]]):
    """
    Dynamically builds and runs a workflow from a given task sequence definition.
    """
    intermediate_data = None
    for task_config in task_sequence:
        intermediate_data = task_handler(
            task_path=task_config["path"],
            task_params=task_config["params"],
            input_data=intermediate_data,
        )
    return intermediate_data

# Constants and deployment
WORKFLOW_NAME = workflow_executor.name

workflow_deployment = await workflow_executor.ato_deployment(
    name="DEPLOYMENT_IDENTIFIER",
    work_pool_name="WORK_POOL_IDENTIFIER",
    entrypoint_type=EntrypointType.MODULE_PATH
)
await workflow_deployment.apply()

# Client interaction
async with get_client() as client:
    try:
        deployment_info = await client.read_deployment_by_name(f"{WORKFLOW_NAME}/DEPLOYMENT_IDENTIFIER")
        
        workflow_run = await client.create_flow_run_from_deployment(
            deployment_id=deployment_info.id,
            parameters={"task_sequence": [task.model_dump() for task in payload.task_sequence]},
        )
        
        run_link = f"<http://localhost:4200/flow-runs/flow-run/{workflow_run.id}>"
this allows for direct passing of data between subsequent tasks using _upstream_data._ Is this a prefect native way of doing things? What if i would like to pass files / objects between tasks instead? What is the prefect native way to pass such data?
m
thought for 559.5 seconds
Great question — and nice pattern. Short answer: - Yes, passing values from one task to the next via return values inside a flow is the native Prefect way. - For large files/objects, the native pattern is to persist results or write to external storage (e.g., S3/GCS/local filesystem) and pass a reference (path/key) instead of the bytes. See Results and persistence in Prefect 3. Details and options: 1) Small/medium Python objects: return values - What you’re doing (returning
intermediate_data
from each task and passing it into the next) is idiomatic Prefect. - Tip for observability: give the generic “handler” task a dynamic name so the UI is more readable.
Copy code
@task(task_run_name=lambda task_path, **_: f"handler[{task_path}]")
  def task_handler(task_path: str, task_params: dict, input_data: Any) -> Any:
      ...
2) Large objects: enable Prefect result persistence - Let Prefect persist task results to a filesystem/object store and (optionally) avoid keeping big payloads in memory. - Configure per-task:
Copy code
from prefect import task, flow
  from prefect.filesystems import RemoteFileSystem
  from prefect.serializers import CompressedPickleSerializer

  remote_fs = RemoteFileSystem(basepath="<s3://my-bucket/prefect/results>")

  @task(
      persist_result=True,
      result_storage=remote_fs,
      result_serializer=CompressedPickleSerializer(),
      cache_result_in_memory=False,  # force downstreams to re-read from storage
  )
  def produce_large_object():
      # return a big object (e.g., DataFrame, bytes)
      ...

  @task
  def consume_large_object(obj):
      ...

  @flow
  def my_flow():
      big = produce_large_object()
      consume_large_object(big)
- Docs: Results and persistence in Prefect 3 3) Large files: write to object storage and pass references - For true “files”, write to S3/GCS/etc. and pass just the key/path:
Copy code
import uuid
  from prefect import task, flow
  from prefect_aws.s3 import upload_to_bucket, download_from_bucket, AwsCredentials

  creds = AwsCredentials.from_environment()

  @task
  def write_file_to_s3(data: bytes) -> str:
      key = f"runs/{uuid.uuid4()}/data.bin"
      upload_to_bucket(data=data, bucket="my-bucket", aws_credentials=creds, key=key)
      return key

  @task
  def read_file_from_s3(key: str) -> bytes:
      return download_from_bucket(bucket="my-bucket", key=key, aws_credentials=creds)

  @flow
  def file_flow():
      key = write_file_to_s3(b"some big payload")
      data = read_file_from_s3(key)
- This pattern avoids moving large payloads through task boundaries and works well across processes/containers. A note on your dynamic “task module” pattern - It’s perfectly fine, but be aware the UI will show repeated runs of the same
task_handler
. The
task_run_name
tip above helps. - If you want per-step observability with parameters/lineage, consider wrapping each unit as a proper
@task
or a small subflow, or augment your handler to emit artifacts/metadata. Programmatic deployments in Prefect 3.x (fixes to your snippet) - There’s no
to_deployment
or
ato_deployment
in 3.x. Use
Flow.deploy(...)
or
Flow.from_source(...).deploy(...)
. - Simple in-code deployment:
Copy code
from prefect import flow
  from prefect.types.entrypoint import EntrypointType

  @flow
  def workflow_executor(task_sequence: list[dict]):
      ...

  deployment_id = workflow_executor.deploy(
      name="DEPLOYMENT_IDENTIFIER",
      work_pool_name="WORK_POOL_IDENTIFIER",
      entrypoint_type=EntrypointType.MODULE_PATH,  # optional; Prefect will infer in many cases
  )
- From remote source: ``` from prefect.flows import Flow flow_obj = await Flow.from_source( source="https://github.com/org/repo", entrypoint="path/to/file.py:workflow_executor", ) deployment_id = await flow_obj.deploy( name="DEPLOYMENT_IDENTIFIER",
work_pool_name="WORK_POOL_IDENTIFIER", )
Copy code
- To trigger a deployment from Python without managing the client directly you can also use:
from prefect.deployments import run_deployment flow_run = await run_deployment( name="workflow_executor/DEPLOYMENT_IDENTIFIER", parameters={"task_sequence": [...]}, ) ``` - Deployment docs: Create deployments in Prefect 3 Questions to tailor a concrete recommendation: - How big are the files/objects you want to pass? - Where do you want to store them (S3, GCS, Azure, shared volume, local disk)? - Do your tasks ever run in separate processes/containers where in-memory handoff is not viable? With those answers I can sketch an exact config (result storage vs object store, serializers, and any collection code you’ll need).
d
@Marvin 2 follow up questions: 1. @task(_task_run_name_=lambda _task_path_, **___: f"handler[{task_path}]") causes following error:
Copy code
Encountered exception during execution: TypeError("<lambda>() missing 1 required positional argument: 'task_path'")
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/flow_engine.py", line 765, in run_context
    yield self
  File "/usr/local/lib/python3.10/site-packages/prefect/flow_engine.py", line 1373, in run_flow_sync
    engine.call_flow_fn()
  File "/usr/local/lib/python3.10/site-packages/prefect/flow_engine.py", line 785, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/callables.py", line 210, in call_with_parameters
    return fn(*args, **kwargs)
  File "/home/xx/flows.py", line 25, in workflow_executor
    intermediate_data = task_handler(
  File "/usr/local/lib/python3.10/site-packages/prefect/tasks.py", line 1139, in __call__
    return run_task(
  File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 1714, in run_task
    return run_task_sync(**kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 1479, in run_task_sync
    with engine.start(task_run_id=task_run_id, dependencies=dependencies):
  File "/usr/local/lib/python3.10/contextlib.py", line 135, in __enter__
    return next(self.gen)
  File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 790, in start
    self._set_custom_task_run_name()
  File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 215, in _set_custom_task_run_name
    task_run_name = resolve_custom_task_run_name(
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/_engine.py", line 69, in resolve_custom_task_run_name
    task_run_name = task.task_run_name()
TypeError: <lambda>() missing 1 required positional argument: 'task_path'
2. what about assets or materialize? why not use this to pass data between tasks?
m
thought for 258.8 seconds
Thanks for the follow-ups! 1) task_run_name lambda error Your lambda is being called without keyword arguments; in Prefect 3 the callable is invoked with a single dict of parameters. That’s why
lambda task_path, **_
fails. Use either a format string or a one-arg callable that accepts a parameters dict: - Simplest (format string):
Copy code
@task(task_run_name="handler[{task_path}]")
def task_handler(task_path: str, task_params: dict, input_data: Any) -> Any:
    ...
- Or a dict-arg callable:
Copy code
@task(task_run_name=lambda params: f"handler[{params.get('task_path', '?')}]")
def task_handler(task_path: str, task_params: dict, input_data: Any) -> Any:
    ...
Under the hood, Prefect resolves the task run name using the parameters dict. That’s why a callable expecting
task_path
as an arg fails. 2) Should I use assets/materialize to pass data? Think of assets as a way to model and track real data products (tables, files, etc.) and their lineage in the UI — not as a byte transport between tasks. - Use assets to declare “this task materializes s3://bucket/path.parquet” and “that task depends on that asset”. - Still use one of: - Return values for small/medium Python objects - Result persistence for large Python objects (store to filesystem/object store and pass references) - Explicit object storage I/O for files (write/read S3, GCS, etc., pass keys/paths) A light example showing how assets fit in with passing references:
Copy code
from prefect import flow
from prefect.assets import Asset, materialize, add_asset_metadata
from prefect_aws.s3 import upload_to_bucket, download_from_bucket, AwsCredentials

raw_asset = Asset("<s3://my-bucket/data/raw.parquet>")
clean_asset = Asset("<s3://my-bucket/data/clean.parquet>")

creds = AwsCredentials.from_environment()

@materialize(raw_asset, by="extract")
def extract_raw() -> str:
    # write the raw file and return the key
    key = "data/raw.parquet"
    upload_to_bucket(data=b"...", bucket="my-bucket", aws_credentials=creds, key=key)
    add_asset_metadata(raw_asset, {"s3_key": key})
    return key

@materialize(clean_asset, by="transform", asset_deps=[raw_asset])
def transform_to_clean(raw_key: str) -> str:
    raw_bytes = download_from_bucket(bucket="my-bucket", key=raw_key, aws_credentials=creds)
    # transform...
    clean_key = "data/clean.parquet"
    upload_to_bucket(data=b"...", bucket="my-bucket", aws_credentials=creds, key=clean_key)
    add_asset_metadata(clean_asset, {"s3_key": clean_key})
    return clean_key

@flow
def pipeline():
    raw_key = extract_raw()
    clean_key = transform_to_clean(raw_key)
    return clean_key
Key takeaways: - Assets give you lineage, discoverability, and UI integration for data products. - They don’t “carry” the data; you still persist to storage and pass references (keys/paths) or use Prefect results for automatic persistence. Links: - Results and persistence: https://docs-3.prefect.io/v3/advanced/results - Assets overview: https://docs-3.prefect.io/v3/concepts/assets - Assets advanced usage: https://docs-3.prefect.io/v3/advanced/assets If you share where your files live (S3/GCS/Azure/local) and typical sizes, I can suggest a concrete pattern (result storage config vs. explicit object-store reads/writes) and serializer choices.
d
@Marvin can i just directly replace a task with materialize?
m
thought for 35.9 seconds
Short answer: yes, you can replace a task with materialize when that function produces or updates a real “data asset” (file, table, dataset). It’s still a Prefect task under the hood and works inside flows the same way. What changes when you use materialize: - You declare which Asset(s) the function materializes so Prefect can track lineage and metadata in the UI. - You still control task behavior (retries, result persistence, etc.) via task kwargs. - You still move data the same way (return values for small data, result persistence or object store for large data). materialize does not “carry” the bytes; it records the intent and metadata of the asset. Drop-in example Before (plain task):
Copy code
from prefect import task, flow

@task(retries=3)
def write_parquet(df) -> str:
    key = "data/raw.parquet"
    # write df -> <s3://my-bucket/data/raw.parquet>
    return key

@flow
def pipeline(df):
    key = write_parquet(df)
    return key
After (materialize):
Copy code
from prefect import flow
from prefect.assets import Asset, materialize, add_asset_metadata

raw_asset = Asset("<s3://my-bucket/data/raw.parquet>")

@materialize(raw_asset, by="pandas", retries=3, task_run_name="materialize[raw.parquet]")
def write_parquet(df) -> str:
    key = "data/raw.parquet"
    # write df -> <s3://my-bucket/data/raw.parquet>
    add_asset_metadata(raw_asset, {"s3_key": key})
    return key

@flow
def pipeline(df):
    key = write_parquet(df)
    return key
Notes and tips - You can pass multiple assets if the function materializes more than one:
@materialize(asset_a, asset_b, ...)
. - You can declare upstream asset lineage with
asset_deps
via task kwargs:
Copy code
@materialize(clean_asset, asset_deps=[raw_asset])
  def transform(...): ...
- You can still enable Prefect result persistence or use object storage I/O exactly as with @task:
Copy code
@materialize(raw_asset, persist_result=True, result_storage=remote_fs, cache_result_in_memory=False)
  def produce_big_obj(...): ...
- If your function does not correspond to a durable data product (pure compute, API call, control logic), keep using
@task
. Docs - Assets overview: https://docs-3.prefect.io/v3/concepts/assets - Advanced assets: https://docs-3.prefect.io/v3/advanced/assets - Results and persistence: https://docs-3.prefect.io/v3/advanced/results
thought for 13.8 seconds
Looks like your message got cut off — could you share the full question? Do you mean any of the following? - Possible to replace every
@task
with
@materialize
? - Possible to pass data directly between tasks using assets (without storing files/objects)? - Possible to use
materialize
for non-durable compute steps? - Possible to use
materialize
without defining
Asset
keys? - Possible to combine
task_run_name
formatting with
materialize
? - Something else? If you can clarify which scenario you’re asking about (and maybe a short code snippet), I’ll give a concrete example.
d
@Marvin possible to dynamically create asset when task is called?
m
thought for 69.1 seconds
Short answer: not with a single, statically-decorated function. The assets a task materializes are fixed at decoration time. But there are good patterns to achieve “dynamic” assets: Options 1) Partitioned asset (recommended for many dynamic cases) - Use one canonical Asset key and record the specific target (e.g., S3 key, partition) as metadata. - Each run materializes the same asset but with different partition metadata.
Copy code
from prefect.assets import Asset, materialize, add_asset_metadata

dataset = Asset("<s3://my-bucket/dataset>")

@materialize(dataset, by="writer")
def write_partition(partition: str, data: bytes) -> str:
    key = f"dataset/{partition}.parquet"
    # write data -> <s3://my-bucket/{key}>
    add_asset_metadata(dataset, {"partition": partition, "s3_key": key})
    return key
2) Build a materializing task dynamically for each asset key - Create the Asset object and wrap your function with
materialize(...)
at runtime via a small factory. This gives you a new MaterializingTask per asset key.
Copy code
from prefect import flow
from prefect.assets import Asset, materialize

def build_materializer(asset_key: str, **task_kwargs):
    asset = Asset(asset_key)
    def decorator(fn):
        return materialize(asset, **task_kwargs)(fn)
    return decorator

def make_writer(asset_key: str):
    @build_materializer(asset_key, by="writer", task_run_name=f"materialize[{asset_key}]")
    def write_bytes(data: bytes) -> str:
        # write bytes to asset_key (e.g., S3 path derived from asset_key)
        return asset_key
    return write_bytes

@flow
def pipeline(keys: list[str]):
    for k in keys:
        writer = make_writer(f"<s3://my-bucket/{k}>")
        writer(b"...")
Notes and caveats - Dynamic factory creates new task objects at runtime; this is fine, but be mindful of generating lots of unique task definitions (naming helps readability). - If you only need to record where data went (and not create many separate assets), the partitioned approach (Option 1) is simpler and keeps lineage compact. -
add_asset_metadata
augments metadata; you still need a materializing task execution for Prefect to record a materialization event. - Assets model data products and lineage; they don’t replace how bytes move. For large data, still persist to storage and pass references. Docs - Assets overview: https://docs-3.prefect.io/v3/concepts/assets - Advanced assets: https://docs-3.prefect.io/v3/advanced/assets
d
@Marvin show how the partitioned asset works with fsspec based local filesystem / shared mount docker volume would work for passing uris of objects
m
thought for 65.1 seconds
Yes — here’s a concrete “partitioned asset” pattern using fsspec and a shared local path (e.g., a Docker volume mounted at /mnt/shared). The flow passes file:// URIs between steps while assets capture lineage/metadata. Key ideas - Use one canonical Asset key for the dataset root (file:///mnt/shared/<dataset>). - Each run writes a partitioned file under that root and returns its file:// URI. - Downstream steps receive the URI, read the file, and optionally materialize a downstream asset. - Ensure the same host path is mounted into all worker containers at the same mount point, so file:// URIs resolve identically everywhere. Example
Copy code
from __future__ import annotations

import fsspec
from pathlib import Path
from urllib.parse import urlparse

from prefect import flow, task
from prefect.assets import Asset, materialize, add_asset_metadata

# Canonical dataset roots as assets (partitioned by filename)
raw_dataset = Asset("file:///mnt/shared/datasets/raw")
clean_dataset = Asset("file:///mnt/shared/datasets/clean")

def ensure_parent_dir(uri: str) -> None:
    """
    Create parent directory for a file:// URI on the local filesystem.
    """
    p = urlparse(uri)
    if p.scheme != "file":
        return
    Path(p.path).parent.mkdir(parents=True, exist_ok=True)

@materialize(raw_dataset, by="writer", task_run_name=lambda params: f"materialize[raw:{params.get('partition','?')}]")
def write_raw_partition(partition: str, data: bytes) -> str:
    """
    Write a 'partition' as a file under the raw dataset root and return the file:// URI.
    """
    target_uri = f"{raw_dataset.key}/{partition}.bin"  # e.g. file:///mnt/shared/datasets/raw/2025-08-25.bin
    ensure_parent_dir(target_uri)
    with fsspec.open(target_uri, "wb") as f:
        f.write(data)

    # Record which partition and exact URI we materialized
    add_asset_metadata(raw_dataset, {"partition": partition, "uri": target_uri})
    return target_uri  # Pass URIs downstream

@task
def read_uri(uri: str) -> bytes:
    with fsspec.open(uri, "rb") as f:
        return f.read()

@materialize(clean_dataset, by="transform", asset_deps=[raw_dataset],
             task_run_name=lambda params: f"materialize[clean:from={Path(urlparse(params.get('raw_uri','')).path).name or '?'}]")
def transform_partition(raw_uri: str) -> str:
    """
    Read a raw partition by URI, transform it, write a new 'clean' partition, and return its URI.
    """
    # Derive partition name from the source file
    partition_name = Path(urlparse(raw_uri).path).stem  # e.g. '2025-08-25'
    clean_uri = f"{clean_dataset.key}/{partition_name}.bin"

    raw_bytes = read_uri(raw_uri)
    transformed = raw_bytes.upper()  # placeholder transform

    ensure_parent_dir(clean_uri)
    with fsspec.open(clean_uri, "wb") as f:
        f.write(transformed)

    add_asset_metadata(clean_dataset, {"partition": partition_name, "uri": clean_uri})
    return clean_uri

@flow
def pipeline(partitions: list[str]):
    # Write raw partitions and collect their URIs
    raw_uris = [write_raw_partition(p, f"payload-for-{p}".encode()) for p in partitions]

    # Transform each raw partition into a clean partition
    clean_uris = [transform_partition(u) for u in raw_uris]

    # Optionally read/inspect a result
    sample = read_uri(clean_uris[0])
    return {"raw": raw_uris, "clean": clean_uris, "sample": sample}

if __name__ == "__main__":
    # Example run
    pipeline(["2025-08-25", "2025-08-26"])
Notes - Asset keys are URIs. For local/shared volumes, use file:///absolute/path. Example: file:///mnt/shared/datasets/raw. - The functions still pass data via return values — in this case, the returned values are file:// URIs. That’s the “native” and scalable way to hand off large objects. - materialize records lineage and metadata for the canonical dataset asset; partitions are tracked via your metadata (e.g., partition name, exact file URI).
- If multiple workers/containers run this flow, ensure the same host directory is mounted at /mnt/shared in all of them so the URIs resolve consistently. - You can add retries, result persistence, etc., to materialize just like @task (they’re still tasks under the hood). Docs - Assets overview: Assets in Prefect 3 - Advanced assets: Advanced asset usage
d
@Marvin i tried to use @materialize(my_asset, _task_run_name_="name: [{task_path}]") with my_asset = Asset("file:///mnt/shared/some_path"). faced the error:
Copy code
Traceback (most recent call last):
  File "flows.py", line 9, in <module>
    my_asset  = Asset("file:///mnt/shared/some_path")
TypeError: BaseModel.__init__() takes 1 positional argument but 2 were given
m
thought for 23.9 seconds
You’re hitting a Pydantic v2 quirk: Prefect 3’s models are keyword-only. Instantiate Asset with keywords, not positionals. - Wrong:
Copy code
my_asset = Asset("file:///mnt/shared/some_path")
- Right:
Copy code
from prefect.assets import Asset

my_asset = Asset(key="file:///mnt/shared/some_path")
# optionally:
# from prefect.assets import AssetProperties
# my_asset = Asset(key="file:///mnt/shared/some_path", properties=AssetProperties(description="..."))
A few more notes: - Use a proper file URI with three slashes and an absolute path:
file:///mnt/shared/some_path
. - Your
task_run_name
usage with materialize is fine; just ensure the parameter you reference exists in the function signature. Example putting it together:
Copy code
from prefect.assets import Asset, materialize

my_asset = Asset(key="file:///mnt/shared/some_path")

@materialize(my_asset, task_run_name="name[{task_path}]")
def my_task(task_path: str, payload: dict):
    # do work...
    return "ok"
If you prefer a callable for task_run_name, it must accept a single parameters dict:
Copy code
@materialize(my_asset, task_run_name=lambda params: f"name[{params.get('task_path','?')}]")
def my_task(task_path: str, payload: dict):
    ...
Docs: - Assets overview: https://docs-3.prefect.io/v3/concepts/assets - Advanced assets: https://docs-3.prefect.io/v3/advanced/assets
d
@Marvin do assets appear in the ui?
m
thought for 23.8 seconds
Yes. In Prefect 3, assets appear in the UI once they are materialized by a run. What you’ll see: - An “Assets” view listing each asset key (e.g., file:///mnt/shared/...). - For each asset: last materialized time/state, recent runs, properties (description, owners), and any metadata you added via
add_asset_metadata
. - Lineage: if you use
asset_deps=[...]
on a materializing task, the UI shows upstream/downstream relationships and a graph. Notes: - Assets only show up after at least one successful materialization (i.e., a function decorated with
@materialize(...)
runs). - The key must be a valid URI string; for local/shared volumes use absolute file URIs like
file:///mnt/shared/datasets/raw
. - You can enrich what you see in the UI with
AssetProperties
(description, owners) and
add_asset_metadata(...)
(e.g., partition, exact file URI). - Assets model metadata and lineage; the UI doesn’t store or preview the file contents. Docs: - Assets overview: https://docs-3.prefect.io/v3/concepts/assets - Advanced assets: https://docs-3.prefect.io/v3/advanced/assets If an asset doesn’t show up, check: - The task is decorated with
@materialize(...)
and actually ran. - The asset key is set with a keyword, e.g.,
Asset(key="file:///...")
. - You’re looking in the same workspace where the run executed.