Hi, in 2.0 I was wondering if it’s possible for th...
# prefect-community
b
Hi, in 2.0 I was wondering if it’s possible for the scheduler to prevent a single flow from “Running” concurrently? For example, if I use an IntervalSchedule with an interval of 5 minutes, and my first flow run takes 8 minutes, can the scheduler skip or wait until the first flow finishes before starting the new flow run?
k
This feature is on our roadmap but isn’t out yet
b
Ok, thanks. As a workaround for now, does it make sense to query the orion api as the first task inside my flow to check if a flow with that name is currently running before proceeding?
k
I think that would work
Maybe it’s easier to get the
context
with this which should contain some scheduled start time?
Copy code
from prefect import flow, task
from prefect.context import get_run_context

@flow
def test():
    ctx = get_run_context()
    print(ctx.flow_run.expected_start_time)
r
be nice if the context contained last run time too
b
It looks like this will work:
Copy code
import time
from prefect import flow, get_run_logger, context, task
from prefect.client import OrionClient
from prefect.orion.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterDeploymentId,
    FlowRunFilterState,
    FlowRunFilterStateType,
    FlowRunFilterId,
)
from prefect.orion.schemas.states import StateType
from uuid import UUID


@task
async def is_already_running(
    client: OrionClient, my_deployment_id: UUID, my_flow_run_id: UUID
) -> bool:
    """Return true if the given deployment has other existing runs in RUNNING or PENDING states"""
    logger = get_run_logger()
    runs = await client.read_flow_runs(
        flow_run_filter=FlowRunFilter(
            id=FlowRunFilterId(not_any_=[my_flow_run_id]),
            deployment_id=FlowRunFilterDeploymentId(any_=[my_deployment_id]),
            state=FlowRunFilterState(
                type=FlowRunFilterStateType(any_=[StateType.RUNNING, StateType.PENDING])
            ),
        )
    )
    n_flows = len(runs)
    <http://logger.info|logger.info>(f"{n_flows} already running")
    return n_flows > 0


@flow
def slow_flow():
    logger = get_run_logger()
    ctx = context.get_run_context()
    if is_already_running(
        client=ctx.client,
        my_deployment_id=ctx.flow_run.deployment_id,
        my_flow_run_id=ctx.flow_run.id,
    ).result():
        <http://logger.info|logger.info>("Skipping, already running")
        return

    <http://logger.info|logger.info>("Starting now, about to sleep for 10 minutes")
    time.sleep(600)
    <http://logger.info|logger.info>("Waking up, all done")