Ruslan
02/03/2022, 8:08 AMAnna Geller
02/03/2022, 10:46 AMimport prefect
from prefect import task, Flow
from prefect.client import Client
from prefect.tasks.notifications import SlackTask
import time
@task(log_stdout=True)
def hello_world():
print("Sleeping...")
time.sleep(60)
def alert_on_success_upon_prev_failure(obj, old_state, new_state):
if new_state.is_success():
client = Client()
query = """
query {
flow_run(
order_by: { state_timestamp: desc }
limit: 1
) {
name
id
start_time
end_time
state
}
}
"""
response = client.graphql(
query=query, variables=dict(flow_id=prefect.context.flow_id)
)
last_flow_run = response["data"]["flow_run"]
if last_flow_run["state"] == "Failed":
flow_run_id = prefect.context.get("flow_run_id")
message = f"Flow run {flow_run_id} succeeded upon previous failure"
SlackTask(message).run()
return new_state
with Flow(
"succeeding_flow", state_handlers=[alert_on_success_upon_prev_failure]
) as flow:
hello_task = hello_world()