<@ULVA73B9P> how can I print the list of tasks tha...
# ask-marvin
s
@Marvin how can I print the list of tasks that completed and failed in the flow on_completion and on_failure hooks?
m
Oh, the joy of hooks. They're like little reminders of how much we love to complicate things. But I suppose you're not here for my musings on the existential dread of programming. You want to know how to print the list of tasks that completed and failed in the flow
on_completion
and
on_failure
hooks. In Prefect, both flows and tasks include
on_completion
and
on_failure
options where a list of callable hooks can be provided. These hooks are functions that get called when a task or flow completes or fails. The callable will receive three arguments: -
flow
,
flow_run
, and
state
in the case of a flow hook -
task
,
task_run
, and
state
in the case of a task hook Here's a simple example of how you might add completion hooks to a flow and a task:
Copy code
python
from prefect import task, flow

def my_completion_task_hook_1(task, task_run, state):
    print("This is the first hook — Task completed!!!")

def my_completion_task_hook_2(task, task_run, state):
  print("This is the second hook — Task completed!!!")

def my_completion_flow_hook(flow, flow_run, state):
    print("Flow completed!!!")

@task(on_completion=[my_completion_task_hook_1, my_completion_task_hook_2])
def my_task():
    print("This is the task!")

@flow(on_completion=[my_completion_flow_hook])
def my_flow():
    my_task()

if __name__ == "__main__":
    my_flow()
You can also include a failure hook. It's worth noting that you can supply both
on_completion
and
on_failure
hooks to a flow or task. Only the hooks that are relevant to the final state of the flow or task will be called. To print the list of tasks that completed and failed, you would need to modify the hooks to check the
state
argument, which contains information about the task or flow's final state, and print the task or flow name accordingly. But remember, this is just a basic example. The real world is much more... complex.
s
@Marvin, this prints that the task completed in the task hook. But I want to send a slack notification once the flow has finished which states which tasks failed and which completed. To do this I need to know which tasks failed and which completed from the flow’s on_completion hook. If it helps I store the results of all tasks in a dictionary which I return at the end of the flow
m
Ah, the plot thickens. You want to send a Slack notification from the flow's
on_completion
hook, detailing which tasks failed and which completed. Well, that's a slightly more complicated request, but nothing we can't handle. You mentioned that you store the results of all tasks in a dictionary which you return at the end of the flow. That's great! We can use that information to determine which tasks completed and which failed. Here's a simple example of how you might do this:
Copy code
python
from prefect import task, Flow, Parameter
from prefect.engine.state import Failed

def my_flow_completion_hook(flow, flow_run, state):
    # Get the task run data
    task_run_data = flow_run.task_runs

    # Initialize lists to store the names of completed and failed tasks
    completed_tasks = []
    failed_tasks = []

    # Iterate over the task run data
    for task_run in task_run_data:
        # If the task state is Failed, add it to the failed_tasks list
        if isinstance(task_run.state, Failed):
            failed_tasks.append(task_run.task.name)
        # Otherwise, add it to the completed_tasks list
        else:
            completed_tasks.append(task_run.task.name)

    # Print the lists of completed and failed tasks
    print(f"Completed tasks: {completed_tasks}")
    print(f"Failed tasks: {failed_tasks}")

    # Here you can add your code to send a Slack notification
    # ...

@task
def my_task():
    # This is your task
    pass

@flow(on_completion=[my_flow_completion_hook])
def my_flow():
    my_task()

if __name__ == "__main__":
    my_flow.run()
This is a basic example and you would need to replace the print statements with your code to send a Slack notification. Also, you might want to handle other task states as well, such as
Skipped
or
Cancelled
. And remember, life... uh... finds a way. Especially when it comes to bugs. So test thoroughly.
s
@Marvin, the
flow_run
doesn’t seem to have a
task_runs
attribute
m
Ah, the universe is full of surprises, isn't it? I apologize for the confusion. The
flow_run
object indeed doesn't have a
task_runs
attribute. My circuits must have crossed. In Prefect, the
on_completion
hook for a flow does not provide a direct way to access the states of the individual tasks that were run within the flow. However, you mentioned that you are storing the results of all tasks in a dictionary which you return at the end of the flow. If you also store the state of each task in this dictionary, you can use this information in your
on_completion
hook to determine which tasks completed and which failed. Here's an example of how you might modify your tasks and flow to store the task state in the results dictionary:
Copy code
python
from prefect import task, Flow, Parameter
from prefect.engine.state import Failed

@task
def my_task():
    # This is your task
    pass

@task
def store_task_result(task_name, task_result, task_state):
    # This task stores the result and state of a task in the results dictionary
    return {task_name: {'result': task_result, 'state': task_state}}

@flow
def my_flow():
    task_result = my_task()
    results = store_task_result('my_task', task_result, task_result.state)
    return results

def my_flow_completion_hook(flow, flow_run, state):
    # Get the results dictionary
    results = flow_run.result

    # Initialize lists to store the names of completed and failed tasks
    completed_tasks = []
    failed_tasks = []

    # Iterate over the results dictionary
    for task_name, task_data in results.items():
        # If the task state is Failed, add it to the failed_tasks list
        if isinstance(task_data['state'], Failed):
            failed_tasks.append(task_name)
        # Otherwise, add it to the completed_tasks list
        else:
            completed_tasks.append(task_name)

    # Print the lists of completed and failed tasks
    print(f"Completed tasks: {completed_tasks}")
    print(f"Failed tasks: {failed_tasks}")

    # Here you can add your code to send a Slack notification
    # ...

if __name__ == "__main__":
    my_flow.run(on_completion=[my_flow_completion_hook])
This is a basic example and you would need to replace the print statements with your code to send a Slack notification. Also, you might want to handle other task states as well, such as
Skipped
or
Cancelled
. And remember, life... uh... finds a way. Especially when it comes to bugs. So test thoroughly.