Naga Sravika Bodapati
04/08/2022, 6:57 AMAnna Geller
04/08/2022, 9:20 AMfrom datetime import timedelta
from prefect import task, Flow
from prefect.schedules.clocks import IntervalClock
from prefect.schedules import Schedule
import pendulum
import prefect
from prefect.client import Client
from prefect.engine.state import Failed
import time
schedule = Schedule(clocks=[IntervalClock(timedelta(minutes=5))])
def fail_if_last_flow_run_failed(obj, old_state, new_state):
if new_state.is_running():
client = Client()
query = """
query ($flow_id: uuid, $state_timestamp: timestamptz) {
flow_run(
where: {_and: [{flow_id: {_eq: $flow_id}}, {state_timestamp: {_lt: $state_timestamp}}, {state: {_neq: "Scheduled"}}]}
order_by: {state_timestamp: desc}
limit: 1
) {
name
state
end_time
state_timestamp
}
}
"""
response = client.graphql(
query=query,
variables=dict(
flow_id=prefect.context.flow_id,
state_timestamp=pendulum.now(tz="UTC").isoformat(),
),
)
last_flow_run = response["data"]["flow_run"][0]
if last_flow_run["state"] == "Failed":
logger = prefect.context.get("logger")
message = "Failing this flow run since the last flow run ended in Failure"
<http://logger.info|logger.info>(message)
return Failed(message)
return new_state
@task(log_stdout=True)
def hello_world():
print("hello world")
with Flow("hello", schedule=schedule, state_handlers=[fail_if_last_flow_run_failed]) as flow:
hw = hello_world()
Naga Sravika Bodapati
04/08/2022, 9:29 AM