Zivile
07/10/2025, 3:32 PMMarvin
07/10/2025, 3:32 PMMarvin
07/10/2025, 3:33 PMpython
from prefect import task
from prefect.events.utilities import emit_event
from datetime import datetime
@task
def process_data():
# Your task logic here
# Emit an event
emit_event(
event="data.processed", # Name of your event
resource={
"prefect.resource.id": "my-task", # Unique identifier for the resource
"prefect.resource.name": "process_data_task", # Name of the resource
},
payload={
"records_processed": 100,
"status": "success"
}
)
return "Processing complete"
# Alternative example with more details
@task
def analyze_data():
# Your analysis logic here
emit_event(
event="data.analyzed",
resource={
"prefect.resource.id": "analysis-task-001",
"prefect.resource.name": "data_analyzer",
},
occurred=datetime.utcnow(), # Explicitly set the occurrence time
related=[
{
"prefect.resource.id": "dataset-123",
"prefect.resource.name": "customer_dataset"
}
],
payload={
"analysis_type": "customer_segmentation",
"segments_found": 5,
"total_records": 1000
}
)
return "Analysis complete"
Key points about emitting events:
1. The event
parameter should follow a dot-notation naming convention (e.g., "data.processed", "user.created")
2. The resource
dictionary must contain at least:
- `prefect.resource.id`: A unique identifier for the resource
- `prefect.resource.name`: A human-readable name for the resource
3. The payload
can contain any JSON-serializable data relevant to the event
4. related
resources can be specified to link the event to other resources in your system
5. If you don't specify occurred
, it defaults to the current time
Events can be useful for:
- Tracking important milestones in your data pipeline
- Triggering downstream processes
- Monitoring and observability
- Creating audit trails
You can find more details about events in the Prefect documentation.Zivile
07/10/2025, 3:35 PMMarvin
07/10/2025, 3:35 PMMarvin
07/10/2025, 3:35 PMpython
from prefect import task
from prefect.events import emit_event
from datetime import datetime
@task
def process_data():
# Your task logic here
# Emit an event
emit_event(
event="data.processed", # Name of your event
resource={
"prefect.resource.id": "my-task" # Unique identifier for the resource
},
payload={
"records_processed": 100,
"status": "success"
}
)
return "Processing complete"
# Alternative example with more details
@task
def analyze_data():
# Your analysis logic here
emit_event(
event="data.analyzed",
resource={
"prefect.resource.id": "analysis-task-001"
},
occurred=datetime.utcnow(), # Explicitly set the occurrence time
related=[
{
"prefect.resource.id": "dataset-123"
}
],
payload={
"analysis_type": "customer_segmentation",
"segments_found": 5,
"total_records": 1000
}
)
return "Analysis complete"
Key differences from Prefect 3.x:
1. The import is from prefect.events
instead of prefect.events.utilities
2. The resource
dictionary only requires the prefect.resource.id
field (the name
field is not required)
3. Similarly, related resources only need the prefect.resource.id
Events in Prefect 2.19 can be used to:
- Track important milestones in your pipeline
- Enable event-driven automation in Prefect Cloud
- Create audit trails for your workflows
- Trigger downstream processes based on task completion or other events
You can find more details about events in the Prefect 2.x documentation.Zivile
07/10/2025, 3:48 PMMarvin
07/10/2025, 3:48 PMMarvin
07/10/2025, 3:50 PMpython
from prefect import task, get_run_logger
from prefect.events import emit_event
import uuid
@task
def process_data(dataset_name: str):
logger = get_run_logger()
# Option 1: Using task name and a unique identifier
resource_id = f"task.process_data.{uuid.uuid4()}"
# Option 2: Using dataset name as part of the identifier
resource_id_2 = f"dataset.{dataset_name}.processor"
# Option 3: Using multiple components
resource_id_3 = f"data.processing.{dataset_name}.{uuid.uuid4()}"
emit_event(
event="data.processed",
resource={
"prefect.resource.id": resource_id
},
payload={
"dataset": dataset_name,
"status": "complete"
}
)
<http://logger.info|logger.info>(f"Emitted event with resource ID: {resource_id}")
return "Processing complete"
# Example with multiple related resources
@task
def transform_data(input_dataset: str, output_dataset: str):
# Main resource ID for the transformation
main_resource_id = f"transform.{input_dataset}_to_{output_dataset}"
emit_event(
event="data.transformed",
resource={
"prefect.resource.id": main_resource_id
},
related=[
{
"prefect.resource.id": f"dataset.{input_dataset}"
},
{
"prefect.resource.id": f"dataset.{output_dataset}"
}
],
payload={
"input_dataset": input_dataset,
"output_dataset": output_dataset
}
)
Best practices for setting resource IDs:
1. Use Consistent Patterns
python
# Pattern: category.subcategory.identifier
resource_id = "task.data_processing.daily_sales"
resource_id = "flow.etl.customer_data"
2. Include Meaningful Information
python
# Include relevant identifiers
resource_id = f"processor.{dataset_name}.{process_type}"
3. Ensure Uniqueness When Needed
python
# Combine static and dynamic elements
resource_id = f"task.processor.{dataset_name}.{uuid.uuid4()}"
4. Hierarchical Structure
python
# Use dots to create hierarchy
resource_id = f"department.{dept_name}.process.{process_id}"
5. Keep IDs Stable for the Same Resource
python
# Use stable identifiers when the resource is permanent
resource_id = f"daily_report.{report_date}"
Key points to remember:
- Resource IDs should be unique within your workspace
- Use dot notation to create hierarchical structure
- Make IDs descriptive and meaningful
- Include stable identifiers when possible
- Add unique identifiers (like UUIDs) when needed for one-time events
- Keep the format consistent across your application
The resource ID structure helps in:
- Tracking event sources
- Filtering and querying events
- Organizing events hierarchically
- Maintaining relationships between different resources
- Creating meaningful automation rules
Remember that resource IDs are used to track the source of events, so they should be chosen carefully to make event monitoring and debugging easier.