Hi, I wanted to know if I could iterate over all tasks in a flow’s on_completion/on_failure hook. My use case is to send a slack notification with a summary of task status, e.g.
🟢 Task 1
🔴 Task 2
🟢 Task 3
I could use the task’s on_completion/on_failure hook, but that would sent a separate slack message for each task which is too verbose. I’d like it in 1 message at the end of the flow.
j
Justin Trautmann
10/11/2023, 9:40 AM
i think the flow context isn't available anymore at the time of hook execution. but what about getting the task runs of that flow from the prefect api?
Copy code
from prefect import flow, task
from prefect.client import get_client
from prefect.client.schemas.filters import FlowRunFilter
@task
def task_1():
raise Exception
@task
def task_2():
pass
async def hook(flow, flow_run, state):
task_runs = await get_client().read_task_runs(flow_run_filter=FlowRunFilter(id={"any_": [flow_run.id]}))
message = ""
for task_run in task_runs:
message += f"Task {task_run.name} has final state {task_run.state_name}.\n"
print(message)
@flow(on_completion=[hook], on_failure=[hook])
def main(
) -> None:
task_1.submit()
task_2.submit()
if __name__ == "__main__":
main()
s
Shane Breeze
10/11/2023, 10:47 AM
That’s exactly what I need, thanks. I haven’t used the prefect api much - is it in the docs somewhere?
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.