In Prefect 1.0, is there a way to run the flow nex...
# prefect-community
n
In Prefect 1.0, is there a way to run the flow next on schedule only if its earlier run is successful? Looking for program based solutions but other also welcome! Thanks.
1
a
Yes, here is how you can do this:
Copy code
from datetime import timedelta
from prefect import task, Flow
from prefect.schedules.clocks import IntervalClock
from prefect.schedules import Schedule
import pendulum
import prefect
from prefect.client import Client
from prefect.engine.state import Failed
import time


schedule = Schedule(clocks=[IntervalClock(timedelta(minutes=5))])


def fail_if_last_flow_run_failed(obj, old_state, new_state):
    if new_state.is_running():
        client = Client()
        query = """
            query ($flow_id: uuid, $state_timestamp: timestamptz) {
              flow_run(
                where: {_and: [{flow_id: {_eq: $flow_id}}, {state_timestamp: {_lt: $state_timestamp}}, {state: {_neq: "Scheduled"}}]}
                order_by: {state_timestamp: desc}
                limit: 1
              ) {
                name
                state
                end_time
                state_timestamp
              }
            }
        """
        response = client.graphql(
            query=query,
            variables=dict(
                flow_id=prefect.context.flow_id,
                state_timestamp=pendulum.now(tz="UTC").isoformat(),
            ),
        )
        last_flow_run = response["data"]["flow_run"][0]

        if last_flow_run["state"] == "Failed":
            logger = prefect.context.get("logger")
            message = "Failing this flow run since the last flow run ended in Failure"
            <http://logger.info|logger.info>(message)
            return Failed(message)
    return new_state


@task(log_stdout=True)
def hello_world():
    print("hello world")


with Flow("hello", schedule=schedule, state_handlers=[fail_if_last_flow_run_failed]) as flow:
    hw = hello_world()
n
Will go through this Anna! Thanks
👍 1
a
instead of returning Failed, you could also return Skipped or Cancelled state
Copy code
from prefect.engine.state import Failed, Skipped, Cancelled
n
ok
a
I'm referring to this line in the state handler function:
Copy code
return Failed(message)
n
sure!
Will the code not create a perpetual failures?
after the first one fails?
a
This was what you requested 😄 does it mean you would prefer to pause schedule after a failed flow run?
if so, check the Automations in PRefect cloud - it allows you to configure it all from the Cloud UI so that you can say: if any flow run from flow X fails, pause schedule - this way, if a run fails, no next scheduled runs will follow until you manually resume the schedule
n
i thought of something - to check if the state of previous flow is still running then skip this one.
a
in that case, this is what you're looking for:
Copy code
import prefect
from prefect import task, Flow
from prefect.client import Client
from prefect.engine.state import Skipped
import time


@task(log_stdout=True)
def hello_world():
    print("Sleeping...")
    time.sleep(360)


def skip_if_running_handler(obj, old_state, new_state):
    if new_state.is_running():
        client = Client()
        query = """
            query($flow_id: uuid) {
              flow_run(
                where: {_and: [{flow_id: {_eq: $flow_id}},
                {state: {_eq: "Running"}}]}
                limit: 1
                offset: 1
              ) {
                name
                state
                start_time
              }
            }
        """
        response = client.graphql(
            query=query, variables=dict(flow_id=prefect.context.flow_id)
        )
        active_flow_runs = response["data"]["flow_run"]
        if active_flow_runs:
            logger = prefect.context.get("logger")
            message = "Skipping this flow run since there are already some flow runs in progress"
            <http://logger.info|logger.info>(message)
            return Skipped(message)
    return new_state


with Flow("skip_if_running", state_handlers=[skip_if_running_handler]) as flow:
    hello_task = hello_world()
🙌 1
n
yes! will take a look. 🙂
👍 1