<@ULVA73B9P> I am working on setting up assets. I ...
# ask-marvin
g
@Marvin I am working on setting up assets. I have 2 flows that write to the same asset. how can I properly set this up? When I check the UI, the asset lineage only will show a connection upstream assets in the latest flow that was run
m
thought for 106.7 seconds
Let me help you set up your flows correctly to maintain proper asset lineage. Here's how to structure two flows writing to the same asset: 1. First, you'll need to use the same asset key in both flows to indicate they're writing to the same asset. Here's an example:
Copy code
python
from prefect import flow
from prefect.materialization import materialize

# Flow 1
@flow
def flow_one():
    @materialize(asset_key="my-data")
    def create_data_one():
        # Your data creation logic here
        return data_one
    
    return create_data_one()

# Flow 2
@flow
def flow_two():
    @materialize(
        asset_key="my-data",
        asset_deps=["upstream-asset"]  # If you have upstream dependencies
    )
    def create_data_two():
        # Your data creation logic here
        return data_two
    
    return create_data_two()
2. To maintain proper lineage when multiple flows write to the same asset: - Use consistent asset keys across flows - Explicitly declare upstream dependencies using
asset_deps
- If one flow's output depends on the other flow's output, make sure to specify that dependency 3. If you want to track the full history of the asset, you can view it in the UI by: - Going to the Assets page - Clicking on the specific asset - Looking at the "History" tab to see all flows that have written to it The reason you're only seeing the latest flow's upstream assets in the UI is because each materialization creates a new version of the asset. This is expected behavior - the lineage shows the dependencies for the current state of the asset. If you need to maintain a record of all upstream dependencies across different flows, you should: 1. Explicitly declare all potential upstream dependencies in each flow using
asset_deps
2. Consider using asset metadata to track additional information about the asset's history You can find more detailed information about asset management in the Prefect Assets documentation. Would you like me to provide a more specific example based on your actual flows, or would you like to know more about any particular aspect of asset management?
g
@Marvin could I define an asset like this an reference it in multiple files, would this make it show the multiple upstream sources?
Copy code
SQS_QUEUE = Asset(
    key="<sqs://my-queue>",
    asset_deps=[
        "<staging://source_1>",
        "<staging://source_2>",
    ]
)
m
thought for 42.8 seconds
While you can define an Asset object like that, it's not the recommended approach in Prefect 3.x for tracking lineage across multiple flows. The Asset class is primarily used for referencing external systems or creating asset metadata, but the actual lineage tracking is handled through the
@materialize
decorator. Here's the recommended approach to achieve what you want: 1. Create a shared module (e.g.,
assets.py
) to define your asset keys:
Copy code
python
# assets.py
SQS_QUEUE_KEY = "<sqs://my-queue>"
SOURCE_1_KEY = "<staging://source_1>"
SOURCE_2_KEY = "<staging://source_2>"
2. Reference these keys in your flow files:
Copy code
python
# flow1.py
from prefect import flow
from prefect.materialization import materialize
from .assets import SQS_QUEUE_KEY, SOURCE_1_KEY

@flow
def flow_one():
    @materialize(
        asset_key=SQS_QUEUE_KEY,
        asset_deps=[SOURCE_1_KEY]
    )
    def process_source_one():
        # Your processing logic here
        return result

    return process_source_one()

# flow2.py
from prefect import flow
from prefect.materialization import materialize
from .assets import SQS_QUEUE_KEY, SOURCE_2_KEY

@flow
def flow_two():
    @materialize(
        asset_key=SQS_QUEUE_KEY,
        asset_deps=[SOURCE_2_KEY]
    )
    def process_source_two():
        # Your processing logic here
        return result

    return process_source_two()
This approach will: - Show all upstream dependencies in the UI - Maintain consistent asset keys across your codebase - Track the lineage properly when either flow runs If you need to add metadata about the SQS queue itself (like description, URL, etc.), you can create an Asset object, but use it separately from the lineage tracking:
Copy code
python
# assets.py
from prefect.assets import Asset

SQS_QUEUE_KEY = "<sqs://my-queue>"
SOURCE_1_KEY = "<staging://source_1>"
SOURCE_2_KEY = "<staging://source_2>"

# For metadata purposes only
SQS_QUEUE = Asset(
    key=SQS_QUEUE_KEY,
    properties={
        "description": "My SQS Queue for processing data",
        "url": "<https://console.aws.amazon.com/sqs/...>"
    }
)
The lineage in the UI will show both upstream sources when either flow runs, as long as you've properly declared the dependencies using
asset_deps
in the
@materialize
decorator. Remember that the asset lineage shows the current state of dependencies, so you'll see all declared upstream dependencies regardless of which flow last materialized the asset.
g
@Marvin can I also show processing that happens downstream from the sqs queue? for example the sqs queue will send data to a lambda function
m
thought for 42.1 seconds
Yes, you can represent the downstream Lambda processing in your asset lineage. Here's how to structure it:
Copy code
python
# assets.py
SQS_QUEUE_KEY = "<sqs://my-queue>"
SOURCE_1_KEY = "<staging://source_1>"
SOURCE_2_KEY = "<staging://source_2>"
LAMBDA_OUTPUT_KEY = "<lambda://my-function-output>"  # Represents where Lambda stores its output

# flow1.py
from prefect import flow
from prefect.materialization import materialize
from .assets import SQS_QUEUE_KEY, SOURCE_1_KEY, LAMBDA_OUTPUT_KEY

@flow
def flow_one():
    # First materialize the SQS queue data
    @materialize(
        asset_key=SQS_QUEUE_KEY,
        asset_deps=[SOURCE_1_KEY]
    )
    def write_to_sqs():
        # Your SQS writing logic
        return sqs_result

    # Then track the Lambda processing
    @materialize(
        asset_key=LAMBDA_OUTPUT_KEY,
        asset_deps=[SQS_QUEUE_KEY]  # Show that Lambda output depends on SQS
    )
    def track_lambda_processing():
        # You might not actually process anything here,
        # but you can track that the Lambda was triggered
        # and/or monitor its completion
        return lambda_result

    sqs_data = write_to_sqs()
    track_lambda_processing()
This will create a lineage graph showing:
Copy code
Source 1 → SQS Queue → Lambda Output
Source 2 → SQS Queue → Lambda Output
A few important points: 1. If your Lambda function writes its output somewhere (like S3), you can use that location as the `LAMBDA_OUTPUT_KEY`:
Copy code
python
LAMBDA_OUTPUT_KEY = "<s3://my-bucket/lambda-output/>"
2. You can add more detail about the Lambda function using an Asset definition:
Copy code
python
LAMBDA_FUNCTION = Asset(
    key=LAMBDA_OUTPUT_KEY,
    properties={
        "description": "AWS Lambda function processing SQS messages",
        "url": "<https://console.aws.amazon.com/lambda/home?region=us-east-1#/functions/my-function>",
        "owners": ["<mailto:data-team@company.com|data-team@company.com>"]
    }
)
3. If your Lambda function writes to multiple destinations, you can track each one:
Copy code
python
@flow
def flow_one():
    @materialize(
        asset_key=SQS_QUEUE_KEY,
        asset_deps=[SOURCE_1_KEY]
    )
    def write_to_sqs():
        return sqs_result

    @materialize(
        asset_key="<s3://output-bucket/processed/>",
        asset_deps=[SQS_QUEUE_KEY]
    )
    def track_lambda_output_one():
        return lambda_result_1

    @materialize(
        asset_key="<dynamodb://my-table>",
        asset_deps=[SQS_QUEUE_KEY]
    )
    def track_lambda_output_two():
        return lambda_result_2

    write_to_sqs()
    track_lambda_output_one()
    track_lambda_output_two()
This approach gives you visibility into the complete data pipeline, from source through SQS to the final Lambda processing outputs, all visible in the Prefect UI's asset lineage view. Remember that the
@materialize
decorator is primarily about tracking where data exists, so you're not necessarily executing the Lambda function in these flows, but rather tracking its input and output locations for lineage purposes.
k
@Gio Right now, Prefect can only show the data it has seen about the most recent materialization of an asset, so what you're experiencing is the expected outcome. One of our goals for enhancing assets is with a partitions concept, so that you can have multiple executions contribute to an asset without overwriting it.
g
ok, thank you @Kevin Grismore! I wasn't sure if I should be expecting something like dbt where the full lineage always shows. So for now, I should wait for assets to be built out more if I'm looking for something more like a dbt lineage?
k
I'm not sure exactly what you mean by dbt lineage, but dbt's lineage is compiled before dbt actually executes anything. Prefect's lineage is discovered at runtime, so we have to take a different approach to how we record and display that data. Are there any other shortcomings you're running into? I think it's fine to hold off until we meet your needs if what we offer doesn't make sense for your system's design.
g
with dbt I just mean that the full lineage is always present. I'll be fine for now with the way Prefect is displaying the lineage now, I just wasn't sure I was using it incorrectly