Darius Tan
08/25/2025, 3:12 AM@task
def task_handler(task_path: str, task_params: dict, input_data: Any) -> Any:
"""
Dynamically imports and executes a single task module.
"""
task_class = import_class(task_path)
task_instance = task_class()
if input_data is not None:
return task_instance.run(input_data, **task_params)
else:
return task_instance.run(**task_params)
@flow
def workflow_executor(task_sequence: List[Dict[str, Any]]):
"""
Dynamically builds and runs a workflow from a given task sequence definition.
"""
intermediate_data = None
for task_config in task_sequence:
intermediate_data = task_handler(
task_path=task_config["path"],
task_params=task_config["params"],
input_data=intermediate_data,
)
return intermediate_data
# Constants and deployment
WORKFLOW_NAME = workflow_executor.name
workflow_deployment = await workflow_executor.ato_deployment(
name="DEPLOYMENT_IDENTIFIER",
work_pool_name="WORK_POOL_IDENTIFIER",
entrypoint_type=EntrypointType.MODULE_PATH
)
await workflow_deployment.apply()
# Client interaction
async with get_client() as client:
try:
deployment_info = await client.read_deployment_by_name(f"{WORKFLOW_NAME}/DEPLOYMENT_IDENTIFIER")
workflow_run = await client.create_flow_run_from_deployment(
deployment_id=deployment_info.id,
parameters={"task_sequence": [task.model_dump() for task in payload.task_sequence]},
)
run_link = f"<http://localhost:4200/flow-runs/flow-run/{workflow_run.id}>"
this allows for direct passing of data between subsequent tasks using _upstream_data._ Is this a prefect native way of doing things? What if i would like to pass files / objects between tasks instead? What is the prefect native way to pass such data?Marvin
08/25/2025, 3:12 AMMarvin
08/25/2025, 3:21 AMintermediate_data
from each task and passing it into the next) is idiomatic Prefect.
- Tip for observability: give the generic “handler” task a dynamic name so the UI is more readable.
@task(task_run_name=lambda task_path, **_: f"handler[{task_path}]")
def task_handler(task_path: str, task_params: dict, input_data: Any) -> Any:
...
2) Large objects: enable Prefect result persistence
- Let Prefect persist task results to a filesystem/object store and (optionally) avoid keeping big payloads in memory.
- Configure per-task:
from prefect import task, flow
from prefect.filesystems import RemoteFileSystem
from prefect.serializers import CompressedPickleSerializer
remote_fs = RemoteFileSystem(basepath="<s3://my-bucket/prefect/results>")
@task(
persist_result=True,
result_storage=remote_fs,
result_serializer=CompressedPickleSerializer(),
cache_result_in_memory=False, # force downstreams to re-read from storage
)
def produce_large_object():
# return a big object (e.g., DataFrame, bytes)
...
@task
def consume_large_object(obj):
...
@flow
def my_flow():
big = produce_large_object()
consume_large_object(big)
- Docs: Results and persistence in Prefect 3
3) Large files: write to object storage and pass references
- For true “files”, write to S3/GCS/etc. and pass just the key/path:
import uuid
from prefect import task, flow
from prefect_aws.s3 import upload_to_bucket, download_from_bucket, AwsCredentials
creds = AwsCredentials.from_environment()
@task
def write_file_to_s3(data: bytes) -> str:
key = f"runs/{uuid.uuid4()}/data.bin"
upload_to_bucket(data=data, bucket="my-bucket", aws_credentials=creds, key=key)
return key
@task
def read_file_from_s3(key: str) -> bytes:
return download_from_bucket(bucket="my-bucket", key=key, aws_credentials=creds)
@flow
def file_flow():
key = write_file_to_s3(b"some big payload")
data = read_file_from_s3(key)
- This pattern avoids moving large payloads through task boundaries and works well across processes/containers.
A note on your dynamic “task module” pattern
- It’s perfectly fine, but be aware the UI will show repeated runs of the same task_handler
. The task_run_name
tip above helps.
- If you want per-step observability with parameters/lineage, consider wrapping each unit as a proper @task
or a small subflow, or augment your handler to emit artifacts/metadata.
Programmatic deployments in Prefect 3.x (fixes to your snippet)
- There’s no to_deployment
or ato_deployment
in 3.x. Use Flow.deploy(...)
or Flow.from_source(...).deploy(...)
.
- Simple in-code deployment:
from prefect import flow
from prefect.types.entrypoint import EntrypointType
@flow
def workflow_executor(task_sequence: list[dict]):
...
deployment_id = workflow_executor.deploy(
name="DEPLOYMENT_IDENTIFIER",
work_pool_name="WORK_POOL_IDENTIFIER",
entrypoint_type=EntrypointType.MODULE_PATH, # optional; Prefect will infer in many cases
)
- From remote source:
```
from prefect.flows import Flow
flow_obj = await Flow.from_source(
source="https://github.com/org/repo",
entrypoint="path/to/file.py:workflow_executor",
)
deployment_id = await flow_obj.deploy(
name="DEPLOYMENT_IDENTIFIER",Marvin
08/25/2025, 3:21 AM- To trigger a deployment from Python without managing the client directly you can also use:
from prefect.deployments import run_deployment
flow_run = await run_deployment(
name="workflow_executor/DEPLOYMENT_IDENTIFIER",
parameters={"task_sequence": [...]},
)
```
- Deployment docs: Create deployments in Prefect 3
Questions to tailor a concrete recommendation:
- How big are the files/objects you want to pass?
- Where do you want to store them (S3, GCS, Azure, shared volume, local disk)?
- Do your tasks ever run in separate processes/containers where in-memory handoff is not viable?
With those answers I can sketch an exact config (result storage vs object store, serializers, and any collection code you’ll need).Darius Tan
08/25/2025, 5:23 AMEncountered exception during execution: TypeError("<lambda>() missing 1 required positional argument: 'task_path'")
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/flow_engine.py", line 765, in run_context
yield self
File "/usr/local/lib/python3.10/site-packages/prefect/flow_engine.py", line 1373, in run_flow_sync
engine.call_flow_fn()
File "/usr/local/lib/python3.10/site-packages/prefect/flow_engine.py", line 785, in call_flow_fn
result = call_with_parameters(self.flow.fn, self.parameters)
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/callables.py", line 210, in call_with_parameters
return fn(*args, **kwargs)
File "/home/xx/flows.py", line 25, in workflow_executor
intermediate_data = task_handler(
File "/usr/local/lib/python3.10/site-packages/prefect/tasks.py", line 1139, in __call__
return run_task(
File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 1714, in run_task
return run_task_sync(**kwargs)
File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 1479, in run_task_sync
with engine.start(task_run_id=task_run_id, dependencies=dependencies):
File "/usr/local/lib/python3.10/contextlib.py", line 135, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 790, in start
self._set_custom_task_run_name()
File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 215, in _set_custom_task_run_name
task_run_name = resolve_custom_task_run_name(
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/_engine.py", line 69, in resolve_custom_task_run_name
task_run_name = task.task_run_name()
TypeError: <lambda>() missing 1 required positional argument: 'task_path'
2. what about assets or materialize? why not use this to pass data between tasks?Marvin
08/25/2025, 5:23 AMMarvin
08/25/2025, 5:27 AMlambda task_path, **_
fails.
Use either a format string or a one-arg callable that accepts a parameters dict:
- Simplest (format string):
@task(task_run_name="handler[{task_path}]")
def task_handler(task_path: str, task_params: dict, input_data: Any) -> Any:
...
- Or a dict-arg callable:
@task(task_run_name=lambda params: f"handler[{params.get('task_path', '?')}]")
def task_handler(task_path: str, task_params: dict, input_data: Any) -> Any:
...
Under the hood, Prefect resolves the task run name using the parameters dict. That’s why a callable expecting task_path
as an arg fails.
2) Should I use assets/materialize to pass data?
Think of assets as a way to model and track real data products (tables, files, etc.) and their lineage in the UI — not as a byte transport between tasks.
- Use assets to declare “this task materializes s3://bucket/path.parquet” and “that task depends on that asset”.
- Still use one of:
- Return values for small/medium Python objects
- Result persistence for large Python objects (store to filesystem/object store and pass references)
- Explicit object storage I/O for files (write/read S3, GCS, etc., pass keys/paths)
A light example showing how assets fit in with passing references:
from prefect import flow
from prefect.assets import Asset, materialize, add_asset_metadata
from prefect_aws.s3 import upload_to_bucket, download_from_bucket, AwsCredentials
raw_asset = Asset("<s3://my-bucket/data/raw.parquet>")
clean_asset = Asset("<s3://my-bucket/data/clean.parquet>")
creds = AwsCredentials.from_environment()
@materialize(raw_asset, by="extract")
def extract_raw() -> str:
# write the raw file and return the key
key = "data/raw.parquet"
upload_to_bucket(data=b"...", bucket="my-bucket", aws_credentials=creds, key=key)
add_asset_metadata(raw_asset, {"s3_key": key})
return key
@materialize(clean_asset, by="transform", asset_deps=[raw_asset])
def transform_to_clean(raw_key: str) -> str:
raw_bytes = download_from_bucket(bucket="my-bucket", key=raw_key, aws_credentials=creds)
# transform...
clean_key = "data/clean.parquet"
upload_to_bucket(data=b"...", bucket="my-bucket", aws_credentials=creds, key=clean_key)
add_asset_metadata(clean_asset, {"s3_key": clean_key})
return clean_key
@flow
def pipeline():
raw_key = extract_raw()
clean_key = transform_to_clean(raw_key)
return clean_key
Key takeaways:
- Assets give you lineage, discoverability, and UI integration for data products.
- They don’t “carry” the data; you still persist to storage and pass references (keys/paths) or use Prefect results for automatic persistence.
Links:
- Results and persistence: https://docs-3.prefect.io/v3/advanced/results
- Assets overview: https://docs-3.prefect.io/v3/concepts/assets
- Assets advanced usage: https://docs-3.prefect.io/v3/advanced/assets
If you share where your files live (S3/GCS/Azure/local) and typical sizes, I can suggest a concrete pattern (result storage config vs. explicit object-store reads/writes) and serializer choices.Darius Tan
08/25/2025, 5:34 AMMarvin
08/25/2025, 5:34 AMMarvin
08/25/2025, 5:35 AMfrom prefect import task, flow
@task(retries=3)
def write_parquet(df) -> str:
key = "data/raw.parquet"
# write df -> <s3://my-bucket/data/raw.parquet>
return key
@flow
def pipeline(df):
key = write_parquet(df)
return key
After (materialize):
from prefect import flow
from prefect.assets import Asset, materialize, add_asset_metadata
raw_asset = Asset("<s3://my-bucket/data/raw.parquet>")
@materialize(raw_asset, by="pandas", retries=3, task_run_name="materialize[raw.parquet]")
def write_parquet(df) -> str:
key = "data/raw.parquet"
# write df -> <s3://my-bucket/data/raw.parquet>
add_asset_metadata(raw_asset, {"s3_key": key})
return key
@flow
def pipeline(df):
key = write_parquet(df)
return key
Notes and tips
- You can pass multiple assets if the function materializes more than one: @materialize(asset_a, asset_b, ...)
.
- You can declare upstream asset lineage with asset_deps
via task kwargs:
@materialize(clean_asset, asset_deps=[raw_asset])
def transform(...): ...
- You can still enable Prefect result persistence or use object storage I/O exactly as with @task:
@materialize(raw_asset, persist_result=True, result_storage=remote_fs, cache_result_in_memory=False)
def produce_big_obj(...): ...
- If your function does not correspond to a durable data product (pure compute, API call, control logic), keep using @task
.
Docs
- Assets overview: https://docs-3.prefect.io/v3/concepts/assets
- Advanced assets: https://docs-3.prefect.io/v3/advanced/assets
- Results and persistence: https://docs-3.prefect.io/v3/advanced/resultsMarvin
08/25/2025, 5:47 AMMarvin
08/25/2025, 5:47 AM@task
with @materialize
?
- Possible to pass data directly between tasks using assets (without storing files/objects)?
- Possible to use materialize
for non-durable compute steps?
- Possible to use materialize
without defining Asset
keys?
- Possible to combine task_run_name
formatting with materialize
?
- Something else?
If you can clarify which scenario you’re asking about (and maybe a short code snippet), I’ll give a concrete example.Darius Tan
08/25/2025, 5:48 AMMarvin
08/25/2025, 5:48 AMMarvin
08/25/2025, 5:49 AMfrom prefect.assets import Asset, materialize, add_asset_metadata
dataset = Asset("<s3://my-bucket/dataset>")
@materialize(dataset, by="writer")
def write_partition(partition: str, data: bytes) -> str:
key = f"dataset/{partition}.parquet"
# write data -> <s3://my-bucket/{key}>
add_asset_metadata(dataset, {"partition": partition, "s3_key": key})
return key
2) Build a materializing task dynamically for each asset key
- Create the Asset object and wrap your function with materialize(...)
at runtime via a small factory. This gives you a new MaterializingTask per asset key.
from prefect import flow
from prefect.assets import Asset, materialize
def build_materializer(asset_key: str, **task_kwargs):
asset = Asset(asset_key)
def decorator(fn):
return materialize(asset, **task_kwargs)(fn)
return decorator
def make_writer(asset_key: str):
@build_materializer(asset_key, by="writer", task_run_name=f"materialize[{asset_key}]")
def write_bytes(data: bytes) -> str:
# write bytes to asset_key (e.g., S3 path derived from asset_key)
return asset_key
return write_bytes
@flow
def pipeline(keys: list[str]):
for k in keys:
writer = make_writer(f"<s3://my-bucket/{k}>")
writer(b"...")
Notes and caveats
- Dynamic factory creates new task objects at runtime; this is fine, but be mindful of generating lots of unique task definitions (naming helps readability).
- If you only need to record where data went (and not create many separate assets), the partitioned approach (Option 1) is simpler and keeps lineage compact.
- add_asset_metadata
augments metadata; you still need a materializing task execution for Prefect to record a materialization event.
- Assets model data products and lineage; they don’t replace how bytes move. For large data, still persist to storage and pass references.
Docs
- Assets overview: https://docs-3.prefect.io/v3/concepts/assets
- Advanced assets: https://docs-3.prefect.io/v3/advanced/assetsDarius Tan
08/25/2025, 5:55 AMMarvin
08/25/2025, 5:55 AMMarvin
08/25/2025, 5:56 AMfrom __future__ import annotations
import fsspec
from pathlib import Path
from urllib.parse import urlparse
from prefect import flow, task
from prefect.assets import Asset, materialize, add_asset_metadata
# Canonical dataset roots as assets (partitioned by filename)
raw_dataset = Asset("file:///mnt/shared/datasets/raw")
clean_dataset = Asset("file:///mnt/shared/datasets/clean")
def ensure_parent_dir(uri: str) -> None:
"""
Create parent directory for a file:// URI on the local filesystem.
"""
p = urlparse(uri)
if p.scheme != "file":
return
Path(p.path).parent.mkdir(parents=True, exist_ok=True)
@materialize(raw_dataset, by="writer", task_run_name=lambda params: f"materialize[raw:{params.get('partition','?')}]")
def write_raw_partition(partition: str, data: bytes) -> str:
"""
Write a 'partition' as a file under the raw dataset root and return the file:// URI.
"""
target_uri = f"{raw_dataset.key}/{partition}.bin" # e.g. file:///mnt/shared/datasets/raw/2025-08-25.bin
ensure_parent_dir(target_uri)
with fsspec.open(target_uri, "wb") as f:
f.write(data)
# Record which partition and exact URI we materialized
add_asset_metadata(raw_dataset, {"partition": partition, "uri": target_uri})
return target_uri # Pass URIs downstream
@task
def read_uri(uri: str) -> bytes:
with fsspec.open(uri, "rb") as f:
return f.read()
@materialize(clean_dataset, by="transform", asset_deps=[raw_dataset],
task_run_name=lambda params: f"materialize[clean:from={Path(urlparse(params.get('raw_uri','')).path).name or '?'}]")
def transform_partition(raw_uri: str) -> str:
"""
Read a raw partition by URI, transform it, write a new 'clean' partition, and return its URI.
"""
# Derive partition name from the source file
partition_name = Path(urlparse(raw_uri).path).stem # e.g. '2025-08-25'
clean_uri = f"{clean_dataset.key}/{partition_name}.bin"
raw_bytes = read_uri(raw_uri)
transformed = raw_bytes.upper() # placeholder transform
ensure_parent_dir(clean_uri)
with fsspec.open(clean_uri, "wb") as f:
f.write(transformed)
add_asset_metadata(clean_dataset, {"partition": partition_name, "uri": clean_uri})
return clean_uri
@flow
def pipeline(partitions: list[str]):
# Write raw partitions and collect their URIs
raw_uris = [write_raw_partition(p, f"payload-for-{p}".encode()) for p in partitions]
# Transform each raw partition into a clean partition
clean_uris = [transform_partition(u) for u in raw_uris]
# Optionally read/inspect a result
sample = read_uri(clean_uris[0])
return {"raw": raw_uris, "clean": clean_uris, "sample": sample}
if __name__ == "__main__":
# Example run
pipeline(["2025-08-25", "2025-08-26"])
Notes
- Asset keys are URIs. For local/shared volumes, use file:///absolute/path. Example: file:///mnt/shared/datasets/raw.
- The functions still pass data via return values — in this case, the returned values are file:// URIs. That’s the “native” and scalable way to hand off large objects.
- materialize records lineage and metadata for the canonical dataset asset; partitions are tracked via your metadata (e.g., partition name, exact file URI).Marvin
08/25/2025, 5:56 AMDarius Tan
08/25/2025, 8:10 AMTraceback (most recent call last):
File "flows.py", line 9, in <module>
my_asset = Asset("file:///mnt/shared/some_path")
TypeError: BaseModel.__init__() takes 1 positional argument but 2 were given
Marvin
08/25/2025, 8:10 AMMarvin
08/25/2025, 8:10 AMmy_asset = Asset("file:///mnt/shared/some_path")
- Right:
from prefect.assets import Asset
my_asset = Asset(key="file:///mnt/shared/some_path")
# optionally:
# from prefect.assets import AssetProperties
# my_asset = Asset(key="file:///mnt/shared/some_path", properties=AssetProperties(description="..."))
A few more notes:
- Use a proper file URI with three slashes and an absolute path: file:///mnt/shared/some_path
.
- Your task_run_name
usage with materialize is fine; just ensure the parameter you reference exists in the function signature.
Example putting it together:
from prefect.assets import Asset, materialize
my_asset = Asset(key="file:///mnt/shared/some_path")
@materialize(my_asset, task_run_name="name[{task_path}]")
def my_task(task_path: str, payload: dict):
# do work...
return "ok"
If you prefer a callable for task_run_name, it must accept a single parameters dict:
@materialize(my_asset, task_run_name=lambda params: f"name[{params.get('task_path','?')}]")
def my_task(task_path: str, payload: dict):
...
Docs:
- Assets overview: https://docs-3.prefect.io/v3/concepts/assets
- Advanced assets: https://docs-3.prefect.io/v3/advanced/assetsDarius Tan
08/27/2025, 9:26 AMMarvin
08/27/2025, 9:26 AMMarvin
08/27/2025, 9:27 AMadd_asset_metadata
.
- Lineage: if you use asset_deps=[...]
on a materializing task, the UI shows upstream/downstream relationships and a graph.
Notes:
- Assets only show up after at least one successful materialization (i.e., a function decorated with @materialize(...)
runs).
- The key must be a valid URI string; for local/shared volumes use absolute file URIs like file:///mnt/shared/datasets/raw
.
- You can enrich what you see in the UI with AssetProperties
(description, owners) and add_asset_metadata(...)
(e.g., partition, exact file URI).
- Assets model metadata and lineage; the UI doesn’t store or preview the file contents.
Docs:
- Assets overview: https://docs-3.prefect.io/v3/concepts/assets
- Advanced assets: https://docs-3.prefect.io/v3/advanced/assets
If an asset doesn’t show up, check:
- The task is decorated with @materialize(...)
and actually ran.
- The asset key is set with a keyword, e.g., Asset(key="file:///...")
.
- You’re looking in the same workspace where the run executed.