Naga Sravika Bodapati
04/08/2022, 6:57 AMAnna Geller
from datetime import timedelta
from prefect import task, Flow
from prefect.schedules.clocks import IntervalClock
from prefect.schedules import Schedule
import pendulum
import prefect
from prefect.client import Client
from prefect.engine.state import Failed
import time
schedule = Schedule(clocks=[IntervalClock(timedelta(minutes=5))])
def fail_if_last_flow_run_failed(obj, old_state, new_state):
    if new_state.is_running():
        client = Client()
        query = """
            query ($flow_id: uuid, $state_timestamp: timestamptz) {
              flow_run(
                where: {_and: [{flow_id: {_eq: $flow_id}}, {state_timestamp: {_lt: $state_timestamp}}, {state: {_neq: "Scheduled"}}]}
                order_by: {state_timestamp: desc}
                limit: 1
              ) {
                name
                state
                end_time
                state_timestamp
              }
            }
        """
        response = client.graphql(
            query=query,
            variables=dict(
                flow_id=prefect.context.flow_id,
                state_timestamp=pendulum.now(tz="UTC").isoformat(),
            ),
        )
        last_flow_run = response["data"]["flow_run"][0]
        if last_flow_run["state"] == "Failed":
            logger = prefect.context.get("logger")
            message = "Failing this flow run since the last flow run ended in Failure"
            <http://logger.info|logger.info>(message)
            return Failed(message)
    return new_state
@task(log_stdout=True)
def hello_world():
    print("hello world")
with Flow("hello", schedule=schedule, state_handlers=[fail_if_last_flow_run_failed]) as flow:
    hw = hello_world()Anna Geller
Naga Sravika Bodapati
04/08/2022, 9:29 AM