Alexandre Lima
07/28/2025, 5:42 PMMarvin
07/28/2025, 5:42 PMMarvin
07/28/2025, 5:43 PMon_completion function with custom parameters. Here's a practical implementation:
python
from prefect import flow, task
from prefect.states import State
def completion_callback(flow_state: State, process_id: str):
"""Custom completion callback that includes a process_id parameter"""
print(f"Flow completed with state: {flow_state}")
print(f"Process ID was: {process_id}")
# Add your custom logic here
return flow_state
@flow(
on_completion=[
lambda state: completion_callback(state, process_id="12345")
]
)
def my_flow():
print("Doing some work...")
# Run the flow
my_flow()
Here's a more complex example that shows how to use multiple parameters and access flow run data:
python
from prefect import flow, task
from prefect.states import State
from typing import Any
def advanced_completion_callback(
flow_state: State,
process_id: str,
environment: str = "prod",
extra_metadata: dict[str, Any] | None = None
):
"""
Advanced completion callback with multiple custom parameters
Args:
flow_state: The state of the completed flow
process_id: Custom process identifier
environment: Environment name (default: "prod")
extra_metadata: Additional metadata to process
"""
# Access flow run details
flow_run_id = flow_state.state_details.flow_run_id
print(f"Flow {flow_run_id} completed with state: {flow_state.name}")
print(f"Process ID: {process_id}")
print(f"Environment: {environment}")
if extra_metadata:
print(f"Extra metadata: {extra_metadata}")
# Add your custom logic here
return flow_state
@flow(
on_completion=[
lambda state: advanced_completion_callback(
state,
process_id="PROC-123",
environment="staging",
extra_metadata={"customer_id": "CUS-456"}
)
]
)
def my_advanced_flow():
print("Executing flow...")
# Run the flow
my_advanced_flow()
Key points to remember:
1. The callback function must accept the flow_state parameter as its first argument - this is required by Prefect.
2. Additional parameters can be added after the flow_state parameter.
3. Use a lambda function in the on_completion list to pass custom parameters to your callback.
4. The callback function must return the flow state.
5. You can access flow run details through the flow_state parameter.
You can also create reusable callbacks by using a class-based approach:
python
from dataclasses import dataclass
from prefect.states import State
@dataclass
class CustomCompletionHook:
process_id: str
environment: str = "prod"
def __call__(self, flow_state: State) -> State:
print(f"Flow completed with state: {flow_state}")
print(f"Process ID: {self.process_id}")
print(f"Environment: {self.environment}")
return flow_state
@flow(
on_completion=[CustomCompletionHook(process_id="12345", environment="staging")]
)
def my_flow():
print("Doing some work...")
This class-based approach is particularly useful when you need to reuse the same callback with different parameters across multiple flows.
Remember that on_completion hooks are executed after the flow completes, regardless of whether it succeeded or failed. If you need to handle different states differently, you can check flow_state.is_completed or flow_state.name within your callback.