Mike B
07/02/2025, 3:36 AMMarvin
07/02/2025, 3:36 AMMarvin
07/02/2025, 3:38 AM@materialize
! This is a supported feature in Prefect 3.x. Here's how you can do it:
python
from prefect import flow, task
from prefect.assets import materialize
@task
def process_data(data_id: str) -> str:
return f"Processed data for {data_id}"
@flow
def data_processing_flow(data_id: str):
# The URL can be dynamically constructed using the parameter
result = materialize(f"<s3://my-bucket/{data_id}>")(process_data)(data_id)
return result
Key points about dynamic materialization:
1. The URL is evaluated at runtime, so you can use any variables or parameters available in your flow
2. You can construct the URL using string formatting or any other string manipulation
3. This works with any supported storage protocol (s3://, gcs://, etc.)
You can also use this pattern with asset dependencies:
python
from prefect import flow, task
from prefect.tasks import task_input_hash
@task(cache_key_fn=task_input_hash)
def fetch_data(source_id: str) -> str:
return f"Data from {source_id}"
@flow
def process_with_dependencies(source_id: str, target_id: str):
# Dynamic source asset
data = fetch_data.with_options(asset_deps=[f"source://{source_id}"])(source_id)
# Dynamic target location
result = materialize(f"target://{target_id}")(process_data)(data)
return result
This flexibility allows you to create dynamic data pipelines where both the source and target locations can be determined at runtime based on your flow's parameters.Kevin Grismore
07/02/2025, 3:57 AMmy_materializing _task.with_options(assets=[...])()