https://prefect.io logo
Title
n

Naga Sravika Bodapati

05/25/2022, 12:08 PM
In Prefect 1.0, is there a way to run the flow next on schedule only if its earlier run is successful? Looking for program based solutions but other also welcome! Thanks.
1
a

Anna Geller

05/25/2022, 12:18 PM
Yes, here is how you can do this:
from datetime import timedelta
from prefect import task, Flow
from prefect.schedules.clocks import IntervalClock
from prefect.schedules import Schedule
import pendulum
import prefect
from prefect.client import Client
from prefect.engine.state import Failed
import time


schedule = Schedule(clocks=[IntervalClock(timedelta(minutes=5))])


def fail_if_last_flow_run_failed(obj, old_state, new_state):
    if new_state.is_running():
        client = Client()
        query = """
            query ($flow_id: uuid, $state_timestamp: timestamptz) {
              flow_run(
                where: {_and: [{flow_id: {_eq: $flow_id}}, {state_timestamp: {_lt: $state_timestamp}}, {state: {_neq: "Scheduled"}}]}
                order_by: {state_timestamp: desc}
                limit: 1
              ) {
                name
                state
                end_time
                state_timestamp
              }
            }
        """
        response = client.graphql(
            query=query,
            variables=dict(
                flow_id=prefect.context.flow_id,
                state_timestamp=pendulum.now(tz="UTC").isoformat(),
            ),
        )
        last_flow_run = response["data"]["flow_run"][0]

        if last_flow_run["state"] == "Failed":
            logger = prefect.context.get("logger")
            message = "Failing this flow run since the last flow run ended in Failure"
            <http://logger.info|logger.info>(message)
            return Failed(message)
    return new_state


@task(log_stdout=True)
def hello_world():
    print("hello world")


with Flow("hello", schedule=schedule, state_handlers=[fail_if_last_flow_run_failed]) as flow:
    hw = hello_world()
n

Naga Sravika Bodapati

05/25/2022, 12:19 PM
Will go through this Anna! Thanks
👍 1
a

Anna Geller

05/25/2022, 12:21 PM
instead of returning Failed, you could also return Skipped or Cancelled state
from prefect.engine.state import Failed, Skipped, Cancelled
n

Naga Sravika Bodapati

05/25/2022, 12:22 PM
ok
a

Anna Geller

05/25/2022, 12:22 PM
I'm referring to this line in the state handler function:
return Failed(message)
n

Naga Sravika Bodapati

05/25/2022, 12:22 PM
sure!
Will the code not create a perpetual failures?
after the first one fails?
a

Anna Geller

05/25/2022, 1:09 PM
This was what you requested 😄 does it mean you would prefer to pause schedule after a failed flow run?
if so, check the Automations in PRefect cloud - it allows you to configure it all from the Cloud UI so that you can say: if any flow run from flow X fails, pause schedule - this way, if a run fails, no next scheduled runs will follow until you manually resume the schedule
n

Naga Sravika Bodapati

05/25/2022, 1:10 PM
i thought of something - to check if the state of previous flow is still running then skip this one.
a

Anna Geller

05/25/2022, 1:11 PM
in that case, this is what you're looking for:
import prefect
from prefect import task, Flow
from prefect.client import Client
from prefect.engine.state import Skipped
import time


@task(log_stdout=True)
def hello_world():
    print("Sleeping...")
    time.sleep(360)


def skip_if_running_handler(obj, old_state, new_state):
    if new_state.is_running():
        client = Client()
        query = """
            query($flow_id: uuid) {
              flow_run(
                where: {_and: [{flow_id: {_eq: $flow_id}},
                {state: {_eq: "Running"}}]}
                limit: 1
                offset: 1
              ) {
                name
                state
                start_time
              }
            }
        """
        response = client.graphql(
            query=query, variables=dict(flow_id=prefect.context.flow_id)
        )
        active_flow_runs = response["data"]["flow_run"]
        if active_flow_runs:
            logger = prefect.context.get("logger")
            message = "Skipping this flow run since there are already some flow runs in progress"
            <http://logger.info|logger.info>(message)
            return Skipped(message)
    return new_state


with Flow("skip_if_running", state_handlers=[skip_if_running_handler]) as flow:
    hello_task = hello_world()
🙌 1
n

Naga Sravika Bodapati

05/25/2022, 1:11 PM
yes! will take a look. 🙂
👍 1