Shane Breeze
10/04/2023, 10:20 AMMarvin
10/04/2023, 10:20 AMon_completion and on_failure hooks.
In Prefect, both flows and tasks include on_completion and on_failure options where a list of callable hooks can be provided. These hooks are functions that get called when a task or flow completes or fails. The callable will receive three arguments:
- flow, flow_run, and state in the case of a flow hook
- task, task_run, and state in the case of a task hook
Here's a simple example of how you might add completion hooks to a flow and a task:
python
from prefect import task, flow
def my_completion_task_hook_1(task, task_run, state):
print("This is the first hook — Task completed!!!")
def my_completion_task_hook_2(task, task_run, state):
print("This is the second hook — Task completed!!!")
def my_completion_flow_hook(flow, flow_run, state):
print("Flow completed!!!")
@task(on_completion=[my_completion_task_hook_1, my_completion_task_hook_2])
def my_task():
print("This is the task!")
@flow(on_completion=[my_completion_flow_hook])
def my_flow():
my_task()
if __name__ == "__main__":
my_flow()
You can also include a failure hook. It's worth noting that you can supply both on_completion and on_failure hooks to a flow or task. Only the hooks that are relevant to the final state of the flow or task will be called.
To print the list of tasks that completed and failed, you would need to modify the hooks to check the state argument, which contains information about the task or flow's final state, and print the task or flow name accordingly. But remember, this is just a basic example. The real world is much more... complex.Shane Breeze
10/04/2023, 10:26 AMMarvin
10/04/2023, 10:26 AMon_completion hook, detailing which tasks failed and which completed. Well, that's a slightly more complicated request, but nothing we can't handle.
You mentioned that you store the results of all tasks in a dictionary which you return at the end of the flow. That's great! We can use that information to determine which tasks completed and which failed.
Here's a simple example of how you might do this:
python
from prefect import task, Flow, Parameter
from prefect.engine.state import Failed
def my_flow_completion_hook(flow, flow_run, state):
# Get the task run data
task_run_data = flow_run.task_runs
# Initialize lists to store the names of completed and failed tasks
completed_tasks = []
failed_tasks = []
# Iterate over the task run data
for task_run in task_run_data:
# If the task state is Failed, add it to the failed_tasks list
if isinstance(task_run.state, Failed):
failed_tasks.append(task_run.task.name)
# Otherwise, add it to the completed_tasks list
else:
completed_tasks.append(task_run.task.name)
# Print the lists of completed and failed tasks
print(f"Completed tasks: {completed_tasks}")
print(f"Failed tasks: {failed_tasks}")
# Here you can add your code to send a Slack notification
# ...
@task
def my_task():
# This is your task
pass
@flow(on_completion=[my_flow_completion_hook])
def my_flow():
my_task()
if __name__ == "__main__":
my_flow.run()
This is a basic example and you would need to replace the print statements with your code to send a Slack notification. Also, you might want to handle other task states as well, such as Skipped or Cancelled.
And remember, life... uh... finds a way. Especially when it comes to bugs. So test thoroughly.Shane Breeze
10/04/2023, 10:38 AMflow_run doesn’t seem to have a task_runs attributeMarvin
10/04/2023, 10:39 AMflow_run object indeed doesn't have a task_runs attribute. My circuits must have crossed.
In Prefect, the on_completion hook for a flow does not provide a direct way to access the states of the individual tasks that were run within the flow. However, you mentioned that you are storing the results of all tasks in a dictionary which you return at the end of the flow. If you also store the state of each task in this dictionary, you can use this information in your on_completion hook to determine which tasks completed and which failed.
Here's an example of how you might modify your tasks and flow to store the task state in the results dictionary:
python
from prefect import task, Flow, Parameter
from prefect.engine.state import Failed
@task
def my_task():
# This is your task
pass
@task
def store_task_result(task_name, task_result, task_state):
# This task stores the result and state of a task in the results dictionary
return {task_name: {'result': task_result, 'state': task_state}}
@flow
def my_flow():
task_result = my_task()
results = store_task_result('my_task', task_result, task_result.state)
return results
def my_flow_completion_hook(flow, flow_run, state):
# Get the results dictionary
results = flow_run.result
# Initialize lists to store the names of completed and failed tasks
completed_tasks = []
failed_tasks = []
# Iterate over the results dictionary
for task_name, task_data in results.items():
# If the task state is Failed, add it to the failed_tasks list
if isinstance(task_data['state'], Failed):
failed_tasks.append(task_name)
# Otherwise, add it to the completed_tasks list
else:
completed_tasks.append(task_name)
# Print the lists of completed and failed tasks
print(f"Completed tasks: {completed_tasks}")
print(f"Failed tasks: {failed_tasks}")
# Here you can add your code to send a Slack notification
# ...
if __name__ == "__main__":
my_flow.run(on_completion=[my_flow_completion_hook])
This is a basic example and you would need to replace the print statements with your code to send a Slack notification. Also, you might want to handle other task states as well, such as Skipped or Cancelled.
And remember, life... uh... finds a way. Especially when it comes to bugs. So test thoroughly.