https://prefect.io logo
Title
b

Britt Evans

07/07/2022, 4:35 PM
Hi, in 2.0 I was wondering if it’s possible for the scheduler to prevent a single flow from “Running” concurrently? For example, if I use an IntervalSchedule with an interval of 5 minutes, and my first flow run takes 8 minutes, can the scheduler skip or wait until the first flow finishes before starting the new flow run?
k

Kevin Kho

07/07/2022, 4:45 PM
This feature is on our roadmap but isn’t out yet
b

Britt Evans

07/07/2022, 4:48 PM
Ok, thanks. As a workaround for now, does it make sense to query the orion api as the first task inside my flow to check if a flow with that name is currently running before proceeding?
k

Kevin Kho

07/07/2022, 4:52 PM
I think that would work
Maybe it’s easier to get the
context
with this which should contain some scheduled start time?
from prefect import flow, task
from prefect.context import get_run_context

@flow
def test():
    ctx = get_run_context()
    print(ctx.flow_run.expected_start_time)
r

redsquare

07/07/2022, 5:05 PM
be nice if the context contained last run time too
b

Britt Evans

07/07/2022, 5:51 PM
It looks like this will work:
import time
from prefect import flow, get_run_logger, context, task
from prefect.client import OrionClient
from prefect.orion.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterDeploymentId,
    FlowRunFilterState,
    FlowRunFilterStateType,
    FlowRunFilterId,
)
from prefect.orion.schemas.states import StateType
from uuid import UUID


@task
async def is_already_running(
    client: OrionClient, my_deployment_id: UUID, my_flow_run_id: UUID
) -> bool:
    """Return true if the given deployment has other existing runs in RUNNING or PENDING states"""
    logger = get_run_logger()
    runs = await client.read_flow_runs(
        flow_run_filter=FlowRunFilter(
            id=FlowRunFilterId(not_any_=[my_flow_run_id]),
            deployment_id=FlowRunFilterDeploymentId(any_=[my_deployment_id]),
            state=FlowRunFilterState(
                type=FlowRunFilterStateType(any_=[StateType.RUNNING, StateType.PENDING])
            ),
        )
    )
    n_flows = len(runs)
    <http://logger.info|logger.info>(f"{n_flows} already running")
    return n_flows > 0


@flow
def slow_flow():
    logger = get_run_logger()
    ctx = context.get_run_context()
    if is_already_running(
        client=ctx.client,
        my_deployment_id=ctx.flow_run.deployment_id,
        my_flow_run_id=ctx.flow_run.id,
    ).result():
        <http://logger.info|logger.info>("Skipping, already running")
        return

    <http://logger.info|logger.info>("Starting now, about to sleep for 10 minutes")
    time.sleep(600)
    <http://logger.info|logger.info>("Waking up, all done")