<@ULVA73B9P>: How can I use `@materialize`’s `with...
# marvin-ai
j
@Marvin: How can I use `@materialize`’s
with_options
for dynamic assets with
.map
?
m
thought for 73.0 seconds
Let me provide a clear explanation of how to use
@materialize
with mapping and configuration options in Prefect 3.x. To use
@materialize
with
.map()
for dynamic assets, you'll need to: 1. Define your materialized function with the appropriate configuration 2. Use
.map()
to apply it across your inputs Here's a complete example:
Copy code
python
from prefect import flow
from prefect.assets import materialize

@materialize(
    assets=["processed-data"],  # Base asset name
    **task_kwargs  # Additional task configuration
)
def process_data(input_data: str) -> str:
    # Process your data here
    return f"processed_{input_data}"

@flow
def main_flow():
    input_list = ["data1", "data2", "data3"]
    
    # Method 1: Using map directly with the materialized function
    results = process_data.map(input_list)
    
    # Method 2: Using with_options to modify task behavior for mapping
    configured_task = process_data.with_options(
        retries=2,
        retry_delay_seconds=60,
        name="configured-process-data"
    )
    results = configured_task.map(input_list)

    return results
Key points to remember: 1. Asset Names: When using
.map()
, Prefect will automatically create unique asset names for each mapped instance. If you provided
assets=["processed-data"]
, you'll get assets named like
processed-data-0
,
processed-data-1
, etc. 2. Configuration Options: You can use
with_options()
to configure: - `retries`: Number of retry attempts - `retry_delay_seconds`: Delay between retries - `name`: Custom name for the task - `tags`: Task tags - `timeout_seconds`: Maximum execution time - And other task-specific parameters 3. Order of Operations: Apply
with_options()
before
.map()
. The configuration will be applied to each mapped instance. 4. Parallel Execution: Mapped tasks will execute in parallel by default, subject to your flow's concurrency limits. Example with more complex configuration:
Copy code
python
from prefect import flow
from prefect.assets import materialize

@materialize(
    assets=["data-asset"],
    description="Processes input data and creates versioned assets"
)
def process_data(data: dict) -> dict:
    return {"processed": data["value"]}

@flow
def process_multiple_datasets():
    input_data = [{"value": i} for i in range(5)]
    
    # Configure the task with specific options
    configured_process = process_data.with_options(
        retries=3,
        retry_delay_seconds=30,
        tags=["data-processing"],
        timeout_seconds=3600
    )
    
    # Map over the input data with the configured task
    results = configured_process.map(input_data)
    return results
This pattern is particularly useful when you need to: - Process multiple datasets in parallel - Apply consistent error handling across all mapped executions - Track each processed dataset as a separate asset - Configure task behavior uniformly across all mapped instances Remember that each mapped execution will create its own asset version, allowing you to track the lineage and history of each processed item independently.
j
@Marvin: Instead of the auto-generated asset names, like
processed-data-0
, can I control the name?
m
thought for 81.9 seconds
n
correct me if im wrong @Kevin Grismore but i dont think we would ever auto-generate asset names
😅 1
m
Yes, you can control the asset names when using
.map()
by dynamically creating the asset names based on your input data. Here's how to do it:
Copy code
python
from prefect import flow
from prefect.assets import materialize, Asset

# Method 1: Using a function to generate asset names
@materialize
def process_data(data: dict) -> dict:
    # Create asset name dynamically using the data
    asset_name = f"processed-{data['id']}"
    return materialize.with_assets([asset_name])(lambda: {"processed": data["value"]})()

@flow
def process_multiple_datasets():
    input_data = [
        {"id": "A001", "value": 1},
        {"id": "A002", "value": 2},
    ]
    results = process_data.map(input_data)
    return results

# Method 2: Using Asset instances for more control
@materialize
def process_with_custom_assets(data: dict) -> dict:
    # Create a custom Asset instance
    asset = Asset(
        f"processed-{data['id']}", 
        description=f"Processed data for {data['id']}"
    )
    return materialize.with_assets([asset])(lambda: {"processed": data["value"]})()

@flow
def process_with_custom_assets_flow():
    input_data = [
        {"id": "A001", "value": 1},
        {"id": "A002", "value": 2},
    ]
    results = process_with_custom_assets.map(input_data)
    return results
For even more control over asset properties, you can use this pattern:
Copy code
python
from prefect import flow
from prefect.assets import materialize, Asset
from prefect.artifacts import create_markdown_artifact

@materialize
def process_with_full_metadata(data: dict) -> dict:
    # Create an Asset with full metadata
    asset = Asset(
        f"processed-{data['id']}", 
        description=f"Processed data for {data['id']}",
        metadata={
            "source": data.get("source", "unknown"),
            "timestamp": data.get("timestamp"),
            "version": "1.0"
        }
    )
    
    result = {"processed": data["value"]}
    
    # Create an artifact to document the processing
    create_markdown_artifact(
        f"Processed data for {data['id']}",
        description=f"Data processing completed for {data['id']}"
    )
    
    return materialize.with_assets([asset])(lambda: result)()

@flow
def process_datasets_with_metadata():
    input_data = [
        {
            "id": "A001",
            "value": 1,
            "source": "sensor-1",
            "timestamp": "2023-01-01"
        },
        {
            "id": "A002",
            "value": 2,
            "source": "sensor-2",
            "timestamp": "2023-01-01"
        }
    ]
    results = process_with_full_metadata.map(input_data)
    return results
Key points to remember: 1. Use
materialize.with_assets()
inside your function to dynamically specify asset names or Asset instances. 2. When using Asset instances, you can specify: - Custom names - Descriptions - Metadata - Tags - Other properties that help with organization and tracking 3. The asset names must be unique within your flow run, so ensure your naming scheme produces unique names for each mapped iteration. 4. You can combine this with
with_options()
for task configuration:
Copy code
python
@flow
def configured_process_flow():
    input_data = [{"id": f"A00{i}", "value": i} for i in range(3)]
    
    configured_task = process_with_custom_assets.with_options(
        retries=2,
        name="custom-asset-processor"
    )
    
    results = configured_task.map(input_data)
    return results
This approach gives you full control over asset naming and metadata while maintaining the benefits of parallel execution with
.map()
.
n
to clarify, this
processed-data-0
is the task run name, which you can control with
task_run_name
j
Fair enough!
👍 1
n
today assets must be explicitly declared, which includes the name
j
Okay!
add_asset_metadata
won’t create an asset by key, if that key doesn’t exist in the decorator?
I suppose I could use a non-
.map
approach for concurrency, and call
with_options
for each run
E.g.: https://docs.prefect.io/v3/how-to-guides/workflows/run-work-concurrently
Copy code
@flow
def my_workflow():
    futures = [cool_task.submit() for _ in range(10)]
    wait(futures)
For anyone curious, I’ve successfully done this.
k
nice!