KG
06/11/2024, 9:12 PMMarvin
06/11/2024, 9:12 PMon_completion
argument with a callable to execute a final task within the same flow run. Prefect documentation does provide insight into task dependency management and using task states:
- Task Runners in Prefect
For controlling execution flow, Prefect uses constructs like .wait()
and wait_for=[]
to manage task dependencies, ensuring tasks run in a specific order. However, this isn't the specific on_completion
behavior you're asking about.
Given these findings, it would be best to explore Prefect's dependency management for ensuring tasks run in sequence or reach out to the Prefect community or forums for more detailed use cases.
If you'd like to delve deeper into this topic or need a practical code example, let me know!KG
06/11/2024, 9:20 PMMarvin
06/11/2024, 9:21 PMMarvin
06/11/2024, 9:21 PM#### Option 2: Event-Based
```python
from prefect import flow, task, get_run_logger
@flow
def my_flow():
result = my_task()
emit_event('task_completed', {'task_name': 'my_task', 'duration': result.duration})
@task
def my_task():
# Task logic here
run_logger = get_run_logger()
start_time = run_logger.start_time
# Task processing...
end_time = run_logger.end_time
duration = end_time - start_time
emit_event('task_completed', {'task_name': 'my_task', 'duration': duration})
return duration
@task
def event_listener(event):
if event['name'] == 'task_completed':
task_name = event['data']['task_name']
duration = event['data']['duration']
send_to_logging_aggregator(task_name, duration)
def emit_event(event_name, data):
# Emit event logic
pass
def send_to_logging_aggregator(task_name, duration):
# Send to logging aggregator logic
pass
Both solutions have their merits. Selecting the best approach will depend on your specific needs and the scale of your deployment.KG
06/11/2024, 9:51 PMdef emit_on_complete(flow: Flow, flow_run: FlowRun, state: State):
"""State change hook that will run upon the `flow_run` entering a `Completed` state"""
print(
f"hello from {flow_run.name}'s completion hook |"
f" the return value was {(r := state.result())!r}"
)
emit_event(
event="prefect.result.produced", # this is an arbitrary event name
resource={
"prefect.resource.id": (
f"prefect.result.{flow_run.deployment_id}.{flow_run.id}"
)
},
payload={"result": r},
)
Marvin
06/11/2024, 9:52 PMCompleted
state, which includes retrieving and using the flow name.
Here's an example of how you can create an on_completion
hook that retrieves the flow name and sends the duration to an external logging aggregator:
### Example
python
from prefect import flow, task, State
from prefect.orion.schemas.states import Completed
def emit_on_complete(flow, flow_run, state: State):
"""State change hook that will run upon the `flow_run` entering a `Completed` state"""
if isinstance(state, Completed):
flow_name = flow.name
flow_run_name = flow_run.name
duration = state.result()
print(
f"hello from {flow_run_name}'s completion hook |"
f" the return value was {duration!r}"
)
emit_event(
event="prefect.result.produced", # this is an arbitrary event name
resource={
"prefect.resource.id": (
f"prefect.result.{flow_run.deployment_id}.{flow_run.id}"
)
},
payload={"result": duration, "flow_name": flow_name}
)
def emit_event(event, resource, payload):
# Mock function to send event to logging aggregator
print(f"Emitting event {event} with resource {resource} and payload {payload}")
@task
def my_task():
return 42
@flow(on_completion=[emit_on_complete])
def my_flow():
result = my_task()
return result
my_flow()
### Key Points:
1. State Change Hooks: Prefect allows you to define state change hooks that will execute code when a task or flow enters a specific state. This can be used for custom logging, notifications, or triggering other workflows.
2. Flow and Task Information: The flow_run
object provides attributes such as name
, id
, and deployment_id
that can be used within the hook.
3. Duration Calculation: The duration and other result metrics can be calculated and accessed via the state
object.
### References:
- State Change Hooks
- Creating and Using Hooks
This approach maintains a clean separation of concerns while making the operation flexible and scalable, especially in a Kubernetes context.KG
06/11/2024, 11:13 PMMarvin
06/11/2024, 11:13 PMon_completion
hook, you can use the start_time
and end_time
attributes from the flow_run
object. Here's a practical example demonstrating how to achieve this:
### Example
python
from prefect import flow, task, Flow, FlowRun, State
from datetime import datetime
def emit_on_complete(flow: Flow, flow_run: FlowRun, state: State):
"""State change hook that will run upon the `flow_run` entering a `Completed` state"""
if flow_run.end_time and flow_run.start_time:
duration = flow_run.end_time - flow_run.start_time
flow_name = flow.name
print(
f"hello from {flow_run.name}'s completion hook |"
f" the duration was {duration!r}"
)
emit_event(
event="prefect.result.produced", # this is an arbitrary event name
resource={
"prefect.resource.id": (
f"prefect.result.{flow_run.deployment_id}.{flow_run.id}"
)
},
payload={"duration": duration, "flow_name": flow_name}
)
def emit_event(event, resource, payload):
# Mock function to send event to logging aggregator
print(f"Emitting event {event} with resource {resource} and payload {payload}")
@task
def my_task():
return 42
@flow(on_completion=[emit_on_complete])
def my_flow():
result = my_task()
return result
my_flow()
### Key Points:
1. Retrieving Start and End Times: The flow_run
object contains start_time
and end_time
attributes which can be used to calculate the duration.
2. Calculating Duration: The duration is determined by subtracting the start_time
from the end_time
.
3. Emitting Events: The emit_event
function sends the duration and flow name to the external logging aggregator.
### References:
- State Change Hooks in Prefect
- Prefect Flow Run Objects
This method ensures that the flow's duration is accurately tracked and logged in the on_completion
hook. If you need further refinements or another example, let me know!