https://prefect.io logo
b

Britt Evans

07/07/2022, 4:35 PM
Hi, in 2.0 I was wondering if it’s possible for the scheduler to prevent a single flow from “Running” concurrently? For example, if I use an IntervalSchedule with an interval of 5 minutes, and my first flow run takes 8 minutes, can the scheduler skip or wait until the first flow finishes before starting the new flow run?
k

Kevin Kho

07/07/2022, 4:45 PM
This feature is on our roadmap but isn’t out yet
b

Britt Evans

07/07/2022, 4:48 PM
Ok, thanks. As a workaround for now, does it make sense to query the orion api as the first task inside my flow to check if a flow with that name is currently running before proceeding?
k

Kevin Kho

07/07/2022, 4:52 PM
I think that would work
Maybe it’s easier to get the
context
with this which should contain some scheduled start time?
Copy code
from prefect import flow, task
from prefect.context import get_run_context

@flow
def test():
    ctx = get_run_context()
    print(ctx.flow_run.expected_start_time)
r

redsquare

07/07/2022, 5:05 PM
be nice if the context contained last run time too
b

Britt Evans

07/07/2022, 5:51 PM
It looks like this will work:
Copy code
import time
from prefect import flow, get_run_logger, context, task
from prefect.client import OrionClient
from prefect.orion.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterDeploymentId,
    FlowRunFilterState,
    FlowRunFilterStateType,
    FlowRunFilterId,
)
from prefect.orion.schemas.states import StateType
from uuid import UUID


@task
async def is_already_running(
    client: OrionClient, my_deployment_id: UUID, my_flow_run_id: UUID
) -> bool:
    """Return true if the given deployment has other existing runs in RUNNING or PENDING states"""
    logger = get_run_logger()
    runs = await client.read_flow_runs(
        flow_run_filter=FlowRunFilter(
            id=FlowRunFilterId(not_any_=[my_flow_run_id]),
            deployment_id=FlowRunFilterDeploymentId(any_=[my_deployment_id]),
            state=FlowRunFilterState(
                type=FlowRunFilterStateType(any_=[StateType.RUNNING, StateType.PENDING])
            ),
        )
    )
    n_flows = len(runs)
    <http://logger.info|logger.info>(f"{n_flows} already running")
    return n_flows > 0


@flow
def slow_flow():
    logger = get_run_logger()
    ctx = context.get_run_context()
    if is_already_running(
        client=ctx.client,
        my_deployment_id=ctx.flow_run.deployment_id,
        my_flow_run_id=ctx.flow_run.id,
    ).result():
        <http://logger.info|logger.info>("Skipping, already running")
        return

    <http://logger.info|logger.info>("Starting now, about to sleep for 10 minutes")
    time.sleep(600)
    <http://logger.info|logger.info>("Waking up, all done")
3 Views