<@ULVA73B9P> Can the on_completion argument accept...
# ask-marvin
k
@Marvin Can the on_completion argument accept a callable that executes a final task within the same flow run
m
Based on my search, there isn't a direct reference detailing the use of the
on_completion
argument with a callable to execute a final task within the same flow run. Prefect documentation does provide insight into task dependency management and using task states: - Task Runners in Prefect For controlling execution flow, Prefect uses constructs like
.wait()
and
wait_for=[]
to manage task dependencies, ensuring tasks run in a specific order. However, this isn't the specific
on_completion
behavior you're asking about. Given these findings, it would be best to explore Prefect's dependency management for ensuring tasks run in sequence or reach out to the Prefect community or forums for more detailed use cases. If you'd like to delve deeper into this topic or need a practical code example, let me know!
k
@Marvin discuss and compare the following two solutions to creating a callback for both tasks and flows option 1 decorate the flow and task functions such that before either complete, the duration of the flow or task (based on the existence of flow_context or task_context) is sent to an external logging aggregator over api option 2 emit events on completion of the flow or task and trigger a deployment that accepts the duration and the flow or task name and sends the duration payload to external logging aggregator over api understand the context of prefect deployment is individual flows running on kubernetes pods
m
Let's delve into the two solutions for creating a callback to log the duration of Prefect tasks and flows and compare them. ### Option 1: Decorator-Based Solution In this approach, you would decorate the flow and task functions such that, before they complete, their durations are sent to an external logging aggregator via an API. This involves calculating the duration directly within the flow or task function based on the existence of flow or task context. #### Pros: - Simplicity: Directly within the function, making it straightforward and easy to understand. - Immediate Logging: The logging happens within the task or flow context, ensuring that the duration is logged immediately after the task or flow completes. - Less Overhead: No additional infrastructure or deployment overhead is needed. #### Cons: - Maintenance: Requires modifying each task and flow with the decorator, which might lead to redundancy and more code to maintain. - Tight Coupling: Binds the duration logging logic directly into the task and flow code, leading to less modular and more tightly coupled code. - Scalability: If the number of tasks and flows increases, managing the decorators can become cumbersome. ### Option 2: Event-Based Solution In this approach, events are emitted upon the completion of a task or flow. These events trigger a deployment (potentially another Prefect flow) which then sends the duration and the flow or task name to an external logging aggregator via an API. #### Pros: - Decoupled Logic: The logging logic is separated from the task and flow logic, leading to more modular and maintainable code. - Flexibility: Changing the logging logic or the logging aggregator involves changing only the deployment task, not every task and flow. - Scalability: Better suited for larger and more complex deployments. Additional tasks and flows can be added without modifying the existing logging setup. - Resilience: If the deployment/task that sends to the logging aggregator fails, it doesn't immediately impact the main task or flow completion. #### Cons: - Complexity: Introduces additional complexity with event handling and triggering deployments. - Latency: The logging might not be immediate, as it depends on the deployment triggering correctly and running to completion. - Operational Overhead: Requires ensuring that the infrastructure (e.g., Kubernetes) is correctly set up to handle the event-based triggers and deployments. ### Prefect Deployment Context – Kubernetes Pods In the context of Prefect deployments running individual flows on Kubernetes pods, the considerations are more nuanced: - Resource Utilization: Option 1 has a minimal impact on Kubernetes resources as the logging happens within the same pod. Option 2 might involve additional pods and jobs, increasing resource utilization. - Fault Tolerance: Kubernetes provides mechanisms for handling pod failures. If your logging logic is decoupled (Option 2), Kubernetes can re-schedule the logging task independently of the main flow. - Scalability: Option 2 scales better in a Kubernetes environment as it leverages the event-driven nature and Kubernetes' ability to handle numerous short-lived pods efficiently. ### Recommendations - Small to Medium Deployments: Option 1 might be more attractive for simplicity and immediacy. - Large and Complex Deployments: Option 2, with its decoupled and scalable architecture, is more suitable for complex environments with many tasks and flows. ### Example Implementations #### Option 1: Decorator-Based ```python from prefect import flow, task def log_duration(func): def wrapper(*args, **kwargs): from time import time start = time() result = func(*args, **kwargs) duration = time() - start # Send duration to logging aggregator send_to_logging_aggregator(duration) return result return wrapper @flow @log_duration
def my_flow(): my_task() @task @log_duration def my_task(): # Task logic here pass
Copy code
#### Option 2: Event-Based

```python
from prefect import flow, task, get_run_logger

@flow
def my_flow():
    result = my_task()
    emit_event('task_completed', {'task_name': 'my_task', 'duration': result.duration})

@task
def my_task():
    # Task logic here
    run_logger = get_run_logger()
    start_time = run_logger.start_time
    # Task processing...
    end_time = run_logger.end_time
    duration = end_time - start_time
    emit_event('task_completed', {'task_name': 'my_task', 'duration': duration})
    return duration

@task
def event_listener(event):
    if event['name'] == 'task_completed':
        task_name = event['data']['task_name']
        duration = event['data']['duration']
        send_to_logging_aggregator(task_name, duration)

def emit_event(event_name, data):
    # Emit event logic
    pass

def send_to_logging_aggregator(task_name, duration):
    # Send to logging aggregator logic
    pass
Both solutions have their merits. Selecting the best approach will depend on your specific needs and the scale of your deployment.
k
@Marvin how can I retrieve and use the actual flow name (the deployment name or the name of the flow which by default is the name of the function) in an on_completion hook such as
Copy code
def emit_on_complete(flow: Flow, flow_run: FlowRun, state: State):
    """State change hook that will run upon the `flow_run` entering a `Completed` state"""
    print(
        f"hello from {flow_run.name}'s completion hook |"
        f" the return value was {(r := state.result())!r}"
    )
    emit_event(
        event="prefect.result.produced",  # this is an arbitrary event name
        resource={
            "prefect.resource.id": (
                f"prefect.result.{flow_run.deployment_id}.{flow_run.id}"
            )
        },
        payload={"result": r},
    )
m
You can use state change hooks in Prefect to execute code when a flow or task run enters a
Completed
state, which includes retrieving and using the flow name. Here's an example of how you can create an
on_completion
hook that retrieves the flow name and sends the duration to an external logging aggregator: ### Example
Copy code
python
from prefect import flow, task, State
from prefect.orion.schemas.states import Completed

def emit_on_complete(flow, flow_run, state: State):
    """State change hook that will run upon the `flow_run` entering a `Completed` state"""
    if isinstance(state, Completed):
        flow_name = flow.name
        flow_run_name = flow_run.name
        duration = state.result()
        
        print(
            f"hello from {flow_run_name}'s completion hook |"
            f" the return value was {duration!r}"
        )
        
        emit_event(
            event="prefect.result.produced",  # this is an arbitrary event name
            resource={
                "prefect.resource.id": (
                    f"prefect.result.{flow_run.deployment_id}.{flow_run.id}"
                )
            },
            payload={"result": duration, "flow_name": flow_name}
        )

def emit_event(event, resource, payload):
    # Mock function to send event to logging aggregator
    print(f"Emitting event {event} with resource {resource} and payload {payload}")
    
@task
def my_task():
    return 42

@flow(on_completion=[emit_on_complete])
def my_flow():
    result = my_task()
    return result

my_flow()
### Key Points: 1. State Change Hooks: Prefect allows you to define state change hooks that will execute code when a task or flow enters a specific state. This can be used for custom logging, notifications, or triggering other workflows. 2. Flow and Task Information: The
flow_run
object provides attributes such as
name
,
id
, and
deployment_id
that can be used within the hook. 3. Duration Calculation: The duration and other result metrics can be calculated and accessed via the
state
object. ### References: - State Change Hooks - Creating and Using Hooks This approach maintains a clean separation of concerns while making the operation flexible and scalable, especially in a Kubernetes context.
k
@Marvin how can I access the duration of the flow from within an on_completion hook
m
To access the duration of a flow within an
on_completion
hook, you can use the
start_time
and
end_time
attributes from the
flow_run
object. Here's a practical example demonstrating how to achieve this: ### Example
Copy code
python
from prefect import flow, task, Flow, FlowRun, State
from datetime import datetime

def emit_on_complete(flow: Flow, flow_run: FlowRun, state: State):
    """State change hook that will run upon the `flow_run` entering a `Completed` state"""
    if flow_run.end_time and flow_run.start_time:
        duration = flow_run.end_time - flow_run.start_time
        flow_name = flow.name
        
        print(
            f"hello from {flow_run.name}'s completion hook |"
            f" the duration was {duration!r}"
        )
        
        emit_event(
            event="prefect.result.produced",  # this is an arbitrary event name
            resource={
                "prefect.resource.id": (
                    f"prefect.result.{flow_run.deployment_id}.{flow_run.id}"
                )
            },
            payload={"duration": duration, "flow_name": flow_name}
        )

def emit_event(event, resource, payload):
    # Mock function to send event to logging aggregator
    print(f"Emitting event {event} with resource {resource} and payload {payload}")
    
@task
def my_task():
    return 42

@flow(on_completion=[emit_on_complete])
def my_flow():
    result = my_task()
    return result

my_flow()
### Key Points: 1. Retrieving Start and End Times: The
flow_run
object contains
start_time
and
end_time
attributes which can be used to calculate the duration. 2. Calculating Duration: The duration is determined by subtracting the
start_time
from the
end_time
. 3. Emitting Events: The
emit_event
function sends the duration and flow name to the external logging aggregator. ### References: - State Change Hooks in Prefect - Prefect Flow Run Objects This method ensures that the flow's duration is accurately tracked and logged in the
on_completion
hook. If you need further refinements or another example, let me know!