Jesse
07/10/2025, 3:36 PMwith_options
for dynamic assets with .map
?Marvin
07/10/2025, 3:36 PMMarvin
07/10/2025, 3:37 PM@materialize
with mapping and configuration options in Prefect 3.x.
To use @materialize
with .map()
for dynamic assets, you'll need to:
1. Define your materialized function with the appropriate configuration
2. Use .map()
to apply it across your inputs
Here's a complete example:
python
from prefect import flow
from prefect.assets import materialize
@materialize(
assets=["processed-data"], # Base asset name
**task_kwargs # Additional task configuration
)
def process_data(input_data: str) -> str:
# Process your data here
return f"processed_{input_data}"
@flow
def main_flow():
input_list = ["data1", "data2", "data3"]
# Method 1: Using map directly with the materialized function
results = process_data.map(input_list)
# Method 2: Using with_options to modify task behavior for mapping
configured_task = process_data.with_options(
retries=2,
retry_delay_seconds=60,
name="configured-process-data"
)
results = configured_task.map(input_list)
return results
Key points to remember:
1. Asset Names: When using .map()
, Prefect will automatically create unique asset names for each mapped instance. If you provided assets=["processed-data"]
, you'll get assets named like processed-data-0
, processed-data-1
, etc.
2. Configuration Options: You can use with_options()
to configure:
- `retries`: Number of retry attempts
- `retry_delay_seconds`: Delay between retries
- `name`: Custom name for the task
- `tags`: Task tags
- `timeout_seconds`: Maximum execution time
- And other task-specific parameters
3. Order of Operations: Apply with_options()
before .map()
. The configuration will be applied to each mapped instance.
4. Parallel Execution: Mapped tasks will execute in parallel by default, subject to your flow's concurrency limits.
Example with more complex configuration:
python
from prefect import flow
from prefect.assets import materialize
@materialize(
assets=["data-asset"],
description="Processes input data and creates versioned assets"
)
def process_data(data: dict) -> dict:
return {"processed": data["value"]}
@flow
def process_multiple_datasets():
input_data = [{"value": i} for i in range(5)]
# Configure the task with specific options
configured_process = process_data.with_options(
retries=3,
retry_delay_seconds=30,
tags=["data-processing"],
timeout_seconds=3600
)
# Map over the input data with the configured task
results = configured_process.map(input_data)
return results
This pattern is particularly useful when you need to:
- Process multiple datasets in parallel
- Apply consistent error handling across all mapped executions
- Track each processed dataset as a separate asset
- Configure task behavior uniformly across all mapped instances
Remember that each mapped execution will create its own asset version, allowing you to track the lineage and history of each processed item independently.Jesse
07/10/2025, 3:38 PMprocessed-data-0
, can I control the name?Marvin
07/10/2025, 3:38 PMNate
07/10/2025, 3:40 PMMarvin
07/10/2025, 3:40 PM.map()
by dynamically creating the asset names based on your input data. Here's how to do it:
python
from prefect import flow
from prefect.assets import materialize, Asset
# Method 1: Using a function to generate asset names
@materialize
def process_data(data: dict) -> dict:
# Create asset name dynamically using the data
asset_name = f"processed-{data['id']}"
return materialize.with_assets([asset_name])(lambda: {"processed": data["value"]})()
@flow
def process_multiple_datasets():
input_data = [
{"id": "A001", "value": 1},
{"id": "A002", "value": 2},
]
results = process_data.map(input_data)
return results
# Method 2: Using Asset instances for more control
@materialize
def process_with_custom_assets(data: dict) -> dict:
# Create a custom Asset instance
asset = Asset(
f"processed-{data['id']}",
description=f"Processed data for {data['id']}"
)
return materialize.with_assets([asset])(lambda: {"processed": data["value"]})()
@flow
def process_with_custom_assets_flow():
input_data = [
{"id": "A001", "value": 1},
{"id": "A002", "value": 2},
]
results = process_with_custom_assets.map(input_data)
return results
For even more control over asset properties, you can use this pattern:
python
from prefect import flow
from prefect.assets import materialize, Asset
from prefect.artifacts import create_markdown_artifact
@materialize
def process_with_full_metadata(data: dict) -> dict:
# Create an Asset with full metadata
asset = Asset(
f"processed-{data['id']}",
description=f"Processed data for {data['id']}",
metadata={
"source": data.get("source", "unknown"),
"timestamp": data.get("timestamp"),
"version": "1.0"
}
)
result = {"processed": data["value"]}
# Create an artifact to document the processing
create_markdown_artifact(
f"Processed data for {data['id']}",
description=f"Data processing completed for {data['id']}"
)
return materialize.with_assets([asset])(lambda: result)()
@flow
def process_datasets_with_metadata():
input_data = [
{
"id": "A001",
"value": 1,
"source": "sensor-1",
"timestamp": "2023-01-01"
},
{
"id": "A002",
"value": 2,
"source": "sensor-2",
"timestamp": "2023-01-01"
}
]
results = process_with_full_metadata.map(input_data)
return results
Key points to remember:
1. Use materialize.with_assets()
inside your function to dynamically specify asset names or Asset instances.
2. When using Asset instances, you can specify:
- Custom names
- Descriptions
- Metadata
- Tags
- Other properties that help with organization and tracking
3. The asset names must be unique within your flow run, so ensure your naming scheme produces unique names for each mapped iteration.
4. You can combine this with with_options()
for task configuration:
python
@flow
def configured_process_flow():
input_data = [{"id": f"A00{i}", "value": i} for i in range(3)]
configured_task = process_with_custom_assets.with_options(
retries=2,
name="custom-asset-processor"
)
results = configured_task.map(input_data)
return results
This approach gives you full control over asset naming and metadata while maintaining the benefits of parallel execution with .map()
.Nate
07/10/2025, 3:45 PMprocessed-data-0is the task run name, which you can control with
task_run_name
Jesse
07/10/2025, 3:45 PMNate
07/10/2025, 3:46 PMJesse
07/10/2025, 3:47 PMadd_asset_metadata
won’t create an asset by key, if that key doesn’t exist in the decorator?Jesse
07/10/2025, 3:47 PMJesse
07/10/2025, 3:49 PM.map
approach for concurrency, and call with_options
for each runJesse
07/10/2025, 3:49 PM@flow
def my_workflow():
futures = [cool_task.submit() for _ in range(10)]
wait(futures)
Jesse
07/15/2025, 2:52 PMKevin Grismore
07/15/2025, 2:53 PM