Gio
07/10/2025, 9:14 PMMarvin
07/10/2025, 9:15 PMMarvin
07/10/2025, 9:16 PMpython
from prefect import flow
from prefect.materialization import materialize
# Flow 1
@flow
def flow_one():
@materialize(asset_key="my-data")
def create_data_one():
# Your data creation logic here
return data_one
return create_data_one()
# Flow 2
@flow
def flow_two():
@materialize(
asset_key="my-data",
asset_deps=["upstream-asset"] # If you have upstream dependencies
)
def create_data_two():
# Your data creation logic here
return data_two
return create_data_two()
2. To maintain proper lineage when multiple flows write to the same asset:
- Use consistent asset keys across flows
- Explicitly declare upstream dependencies using asset_deps
- If one flow's output depends on the other flow's output, make sure to specify that dependency
3. If you want to track the full history of the asset, you can view it in the UI by:
- Going to the Assets page
- Clicking on the specific asset
- Looking at the "History" tab to see all flows that have written to it
The reason you're only seeing the latest flow's upstream assets in the UI is because each materialization creates a new version of the asset. This is expected behavior - the lineage shows the dependencies for the current state of the asset.
If you need to maintain a record of all upstream dependencies across different flows, you should:
1. Explicitly declare all potential upstream dependencies in each flow using asset_deps
2. Consider using asset metadata to track additional information about the asset's history
You can find more detailed information about asset management in the Prefect Assets documentation.
Would you like me to provide a more specific example based on your actual flows, or would you like to know more about any particular aspect of asset management?Gio
07/10/2025, 9:29 PMSQS_QUEUE = Asset(
key="<sqs://my-queue>",
asset_deps=[
"<staging://source_1>",
"<staging://source_2>",
]
)
Marvin
07/10/2025, 9:29 PMMarvin
07/10/2025, 9:30 PM@materialize
decorator.
Here's the recommended approach to achieve what you want:
1. Create a shared module (e.g., assets.py
) to define your asset keys:
python
# assets.py
SQS_QUEUE_KEY = "<sqs://my-queue>"
SOURCE_1_KEY = "<staging://source_1>"
SOURCE_2_KEY = "<staging://source_2>"
2. Reference these keys in your flow files:
python
# flow1.py
from prefect import flow
from prefect.materialization import materialize
from .assets import SQS_QUEUE_KEY, SOURCE_1_KEY
@flow
def flow_one():
@materialize(
asset_key=SQS_QUEUE_KEY,
asset_deps=[SOURCE_1_KEY]
)
def process_source_one():
# Your processing logic here
return result
return process_source_one()
# flow2.py
from prefect import flow
from prefect.materialization import materialize
from .assets import SQS_QUEUE_KEY, SOURCE_2_KEY
@flow
def flow_two():
@materialize(
asset_key=SQS_QUEUE_KEY,
asset_deps=[SOURCE_2_KEY]
)
def process_source_two():
# Your processing logic here
return result
return process_source_two()
This approach will:
- Show all upstream dependencies in the UI
- Maintain consistent asset keys across your codebase
- Track the lineage properly when either flow runs
If you need to add metadata about the SQS queue itself (like description, URL, etc.), you can create an Asset object, but use it separately from the lineage tracking:
python
# assets.py
from prefect.assets import Asset
SQS_QUEUE_KEY = "<sqs://my-queue>"
SOURCE_1_KEY = "<staging://source_1>"
SOURCE_2_KEY = "<staging://source_2>"
# For metadata purposes only
SQS_QUEUE = Asset(
key=SQS_QUEUE_KEY,
properties={
"description": "My SQS Queue for processing data",
"url": "<https://console.aws.amazon.com/sqs/...>"
}
)
The lineage in the UI will show both upstream sources when either flow runs, as long as you've properly declared the dependencies using asset_deps
in the @materialize
decorator.
Remember that the asset lineage shows the current state of dependencies, so you'll see all declared upstream dependencies regardless of which flow last materialized the asset.Gio
07/10/2025, 9:36 PMMarvin
07/10/2025, 9:36 PMMarvin
07/10/2025, 9:36 PMpython
# assets.py
SQS_QUEUE_KEY = "<sqs://my-queue>"
SOURCE_1_KEY = "<staging://source_1>"
SOURCE_2_KEY = "<staging://source_2>"
LAMBDA_OUTPUT_KEY = "<lambda://my-function-output>" # Represents where Lambda stores its output
# flow1.py
from prefect import flow
from prefect.materialization import materialize
from .assets import SQS_QUEUE_KEY, SOURCE_1_KEY, LAMBDA_OUTPUT_KEY
@flow
def flow_one():
# First materialize the SQS queue data
@materialize(
asset_key=SQS_QUEUE_KEY,
asset_deps=[SOURCE_1_KEY]
)
def write_to_sqs():
# Your SQS writing logic
return sqs_result
# Then track the Lambda processing
@materialize(
asset_key=LAMBDA_OUTPUT_KEY,
asset_deps=[SQS_QUEUE_KEY] # Show that Lambda output depends on SQS
)
def track_lambda_processing():
# You might not actually process anything here,
# but you can track that the Lambda was triggered
# and/or monitor its completion
return lambda_result
sqs_data = write_to_sqs()
track_lambda_processing()
This will create a lineage graph showing:
Source 1 → SQS Queue → Lambda Output
Source 2 → SQS Queue → Lambda Output
A few important points:
1. If your Lambda function writes its output somewhere (like S3), you can use that location as the `LAMBDA_OUTPUT_KEY`:
python
LAMBDA_OUTPUT_KEY = "<s3://my-bucket/lambda-output/>"
2. You can add more detail about the Lambda function using an Asset definition:
python
LAMBDA_FUNCTION = Asset(
key=LAMBDA_OUTPUT_KEY,
properties={
"description": "AWS Lambda function processing SQS messages",
"url": "<https://console.aws.amazon.com/lambda/home?region=us-east-1#/functions/my-function>",
"owners": ["<mailto:data-team@company.com|data-team@company.com>"]
}
)
3. If your Lambda function writes to multiple destinations, you can track each one:
python
@flow
def flow_one():
@materialize(
asset_key=SQS_QUEUE_KEY,
asset_deps=[SOURCE_1_KEY]
)
def write_to_sqs():
return sqs_result
@materialize(
asset_key="<s3://output-bucket/processed/>",
asset_deps=[SQS_QUEUE_KEY]
)
def track_lambda_output_one():
return lambda_result_1
@materialize(
asset_key="<dynamodb://my-table>",
asset_deps=[SQS_QUEUE_KEY]
)
def track_lambda_output_two():
return lambda_result_2
write_to_sqs()
track_lambda_output_one()
track_lambda_output_two()
This approach gives you visibility into the complete data pipeline, from source through SQS to the final Lambda processing outputs, all visible in the Prefect UI's asset lineage view.
Remember that the @materialize
decorator is primarily about tracking where data exists, so you're not necessarily executing the Lambda function in these flows, but rather tracking its input and output locations for lineage purposes.Kevin Grismore
07/10/2025, 9:38 PMGio
07/10/2025, 9:41 PMKevin Grismore
07/10/2025, 9:45 PMGio
07/10/2025, 9:49 PM