<@ULVA73B9P> how to create on_completion func with...
# ask-marvin
a
@Marvin how to create on_completion func with custom params like process_id?
m
thought for 79.7 seconds
Let me provide you with a clear example of how to create an
on_completion
function with custom parameters. Here's a practical implementation:
Copy code
python
from prefect import flow, task
from prefect.states import State

def completion_callback(flow_state: State, process_id: str):
    """Custom completion callback that includes a process_id parameter"""
    print(f"Flow completed with state: {flow_state}")
    print(f"Process ID was: {process_id}")
    # Add your custom logic here
    return flow_state

@flow(
    on_completion=[
        lambda state: completion_callback(state, process_id="12345")
    ]
)
def my_flow():
    print("Doing some work...")

# Run the flow
my_flow()
Here's a more complex example that shows how to use multiple parameters and access flow run data:
Copy code
python
from prefect import flow, task
from prefect.states import State
from typing import Any

def advanced_completion_callback(
    flow_state: State,
    process_id: str,
    environment: str = "prod",
    extra_metadata: dict[str, Any] | None = None
):
    """
    Advanced completion callback with multiple custom parameters
    
    Args:
        flow_state: The state of the completed flow
        process_id: Custom process identifier
        environment: Environment name (default: "prod")
        extra_metadata: Additional metadata to process
    """
    # Access flow run details
    flow_run_id = flow_state.state_details.flow_run_id
    
    print(f"Flow {flow_run_id} completed with state: {flow_state.name}")
    print(f"Process ID: {process_id}")
    print(f"Environment: {environment}")
    
    if extra_metadata:
        print(f"Extra metadata: {extra_metadata}")
    
    # Add your custom logic here
    return flow_state

@flow(
    on_completion=[
        lambda state: advanced_completion_callback(
            state,
            process_id="PROC-123",
            environment="staging",
            extra_metadata={"customer_id": "CUS-456"}
        )
    ]
)
def my_advanced_flow():
    print("Executing flow...")

# Run the flow
my_advanced_flow()
Key points to remember: 1. The callback function must accept the
flow_state
parameter as its first argument - this is required by Prefect. 2. Additional parameters can be added after the
flow_state
parameter. 3. Use a lambda function in the
on_completion
list to pass custom parameters to your callback. 4. The callback function must return the flow state. 5. You can access flow run details through the
flow_state
parameter. You can also create reusable callbacks by using a class-based approach:
Copy code
python
from dataclasses import dataclass
from prefect.states import State

@dataclass
class CustomCompletionHook:
    process_id: str
    environment: str = "prod"

    def __call__(self, flow_state: State) -> State:
        print(f"Flow completed with state: {flow_state}")
        print(f"Process ID: {self.process_id}")
        print(f"Environment: {self.environment}")
        return flow_state

@flow(
    on_completion=[CustomCompletionHook(process_id="12345", environment="staging")]
)
def my_flow():
    print("Doing some work...")
This class-based approach is particularly useful when you need to reuse the same callback with different parameters across multiple flows. Remember that
on_completion
hooks are executed after the flow completes, regardless of whether it succeeded or failed. If you need to handle different states differently, you can check
flow_state.is_completed
or
flow_state.name
within your callback.