Hi, I wanted to know if I could iterate over all t...
# ask-community
s
Hi, I wanted to know if I could iterate over all tasks in a flow’s on_completion/on_failure hook. My use case is to send a slack notification with a summary of task status, e.g. 🟢 Task 1 🔴 Task 2 🟢 Task 3 I could use the task’s on_completion/on_failure hook, but that would sent a separate slack message for each task which is too verbose. I’d like it in 1 message at the end of the flow.
j
i think the flow context isn't available anymore at the time of hook execution. but what about getting the task runs of that flow from the prefect api?
Copy code
from prefect import flow, task
from prefect.client import get_client
from prefect.client.schemas.filters import FlowRunFilter

@task
def task_1():
    raise Exception

@task
def task_2():
    pass

async def hook(flow, flow_run, state):
    task_runs = await get_client().read_task_runs(flow_run_filter=FlowRunFilter(id={"any_": [flow_run.id]}))
    message = ""
    for task_run in task_runs:
        message += f"Task {task_run.name} has final state {task_run.state_name}.\n"

    print(message)

@flow(on_completion=[hook], on_failure=[hook])
def main(
) -> None:
    task_1.submit()
    task_2.submit()


if __name__ == "__main__":
    main()
s
That’s exactly what I need, thanks. I haven’t used the prefect api much - is it in the docs somewhere?
e
this is the way