Shane Breeze
10/04/2023, 10:20 AMMarvin
10/04/2023, 10:20 AMon_completion
and on_failure
hooks.
In Prefect, both flows and tasks include on_completion
and on_failure
options where a list of callable hooks can be provided. These hooks are functions that get called when a task or flow completes or fails. The callable will receive three arguments:
- flow
, flow_run
, and state
in the case of a flow hook
- task
, task_run
, and state
in the case of a task hook
Here's a simple example of how you might add completion hooks to a flow and a task:
python
from prefect import task, flow
def my_completion_task_hook_1(task, task_run, state):
print("This is the first hook — Task completed!!!")
def my_completion_task_hook_2(task, task_run, state):
print("This is the second hook — Task completed!!!")
def my_completion_flow_hook(flow, flow_run, state):
print("Flow completed!!!")
@task(on_completion=[my_completion_task_hook_1, my_completion_task_hook_2])
def my_task():
print("This is the task!")
@flow(on_completion=[my_completion_flow_hook])
def my_flow():
my_task()
if __name__ == "__main__":
my_flow()
You can also include a failure hook. It's worth noting that you can supply both on_completion
and on_failure
hooks to a flow or task. Only the hooks that are relevant to the final state of the flow or task will be called.
To print the list of tasks that completed and failed, you would need to modify the hooks to check the state
argument, which contains information about the task or flow's final state, and print the task or flow name accordingly. But remember, this is just a basic example. The real world is much more... complex.Shane Breeze
10/04/2023, 10:26 AMMarvin
10/04/2023, 10:26 AMon_completion
hook, detailing which tasks failed and which completed. Well, that's a slightly more complicated request, but nothing we can't handle.
You mentioned that you store the results of all tasks in a dictionary which you return at the end of the flow. That's great! We can use that information to determine which tasks completed and which failed.
Here's a simple example of how you might do this:
python
from prefect import task, Flow, Parameter
from prefect.engine.state import Failed
def my_flow_completion_hook(flow, flow_run, state):
# Get the task run data
task_run_data = flow_run.task_runs
# Initialize lists to store the names of completed and failed tasks
completed_tasks = []
failed_tasks = []
# Iterate over the task run data
for task_run in task_run_data:
# If the task state is Failed, add it to the failed_tasks list
if isinstance(task_run.state, Failed):
failed_tasks.append(task_run.task.name)
# Otherwise, add it to the completed_tasks list
else:
completed_tasks.append(task_run.task.name)
# Print the lists of completed and failed tasks
print(f"Completed tasks: {completed_tasks}")
print(f"Failed tasks: {failed_tasks}")
# Here you can add your code to send a Slack notification
# ...
@task
def my_task():
# This is your task
pass
@flow(on_completion=[my_flow_completion_hook])
def my_flow():
my_task()
if __name__ == "__main__":
my_flow.run()
This is a basic example and you would need to replace the print statements with your code to send a Slack notification. Also, you might want to handle other task states as well, such as Skipped
or Cancelled
.
And remember, life... uh... finds a way. Especially when it comes to bugs. So test thoroughly.Shane Breeze
10/04/2023, 10:38 AMflow_run
doesn’t seem to have a task_runs
attributeMarvin
10/04/2023, 10:39 AMflow_run
object indeed doesn't have a task_runs
attribute. My circuits must have crossed.
In Prefect, the on_completion
hook for a flow does not provide a direct way to access the states of the individual tasks that were run within the flow. However, you mentioned that you are storing the results of all tasks in a dictionary which you return at the end of the flow. If you also store the state of each task in this dictionary, you can use this information in your on_completion
hook to determine which tasks completed and which failed.
Here's an example of how you might modify your tasks and flow to store the task state in the results dictionary:
python
from prefect import task, Flow, Parameter
from prefect.engine.state import Failed
@task
def my_task():
# This is your task
pass
@task
def store_task_result(task_name, task_result, task_state):
# This task stores the result and state of a task in the results dictionary
return {task_name: {'result': task_result, 'state': task_state}}
@flow
def my_flow():
task_result = my_task()
results = store_task_result('my_task', task_result, task_result.state)
return results
def my_flow_completion_hook(flow, flow_run, state):
# Get the results dictionary
results = flow_run.result
# Initialize lists to store the names of completed and failed tasks
completed_tasks = []
failed_tasks = []
# Iterate over the results dictionary
for task_name, task_data in results.items():
# If the task state is Failed, add it to the failed_tasks list
if isinstance(task_data['state'], Failed):
failed_tasks.append(task_name)
# Otherwise, add it to the completed_tasks list
else:
completed_tasks.append(task_name)
# Print the lists of completed and failed tasks
print(f"Completed tasks: {completed_tasks}")
print(f"Failed tasks: {failed_tasks}")
# Here you can add your code to send a Slack notification
# ...
if __name__ == "__main__":
my_flow.run(on_completion=[my_flow_completion_hook])
This is a basic example and you would need to replace the print statements with your code to send a Slack notification. Also, you might want to handle other task states as well, such as Skipped
or Cancelled
.
And remember, life... uh... finds a way. Especially when it comes to bugs. So test thoroughly.