https://prefect.io logo
o

Omar Sultan

02/19/2022, 8:55 PM
Hello, I was wondering if there was a way to prevent a scheduled flow that is scheduled to run every 30 mins, to not start if the previous run has not finished. Any ideas how to do that?
a

Anna Geller

02/19/2022, 10:19 PM
Yes! You can do it using a flow-level state handler:
Copy code
import prefect
from prefect import task, Flow
from prefect.client import Client
from prefect.engine.state import Skipped
import time


@task(log_stdout=True)
def hello_world():
    print("Sleeping...")
    time.sleep(360)


def skip_if_running_handler(obj, old_state, new_state):
    if new_state.is_running():
        client = Client()
        query = """
            query($flow_id: uuid) {
              flow_run(
                where: {_and: [{flow_id: {_eq: $flow_id}},
                {state: {_eq: "Running"}}]}
                limit: 1
                offset: 1
              ) {
                name
                state
                start_time
              }
            }
        """
        response = client.graphql(
            query=query, variables=dict(flow_id=prefect.context.flow_id)
        )
        active_flow_runs = response["data"]["flow_run"]
        if active_flow_runs:
            logger = prefect.context.get("logger")
            message = "Skipping this flow run since there are already some flow runs in progress"
            <http://logger.info|logger.info>(message)
            return Skipped(message)
    return new_state


with Flow("skip_if_running", state_handlers=[skip_if_running_handler]) as flow:
    hello_task = hello_world()
o

Omar Sultan

02/20/2022, 5:15 AM
That is excellent, thank you so mcuh
👍 1
5 Views