Shane Breeze
10/11/2023, 7:50 AMJustin Trautmann
10/11/2023, 9:40 AMfrom prefect import flow, task
from prefect.client import get_client
from prefect.client.schemas.filters import FlowRunFilter
@task
def task_1():
raise Exception
@task
def task_2():
pass
async def hook(flow, flow_run, state):
task_runs = await get_client().read_task_runs(flow_run_filter=FlowRunFilter(id={"any_": [flow_run.id]}))
message = ""
for task_run in task_runs:
message += f"Task {task_run.name} has final state {task_run.state_name}.\n"
print(message)
@flow(on_completion=[hook], on_failure=[hook])
def main(
) -> None:
task_1.submit()
task_2.submit()
if __name__ == "__main__":
main()
Shane Breeze
10/11/2023, 10:47 AMJustin Trautmann
10/11/2023, 12:00 PMEmil Christensen
10/11/2023, 12:04 PM