https://prefect.io logo
Title
s

Surat Mukker

09/15/2022, 3:54 AM
Hello two new questions, 1. our use case requires that we have only one flow run of a deployment executing at a given time,. i.e if it a flow run running behind for any reason, we do not want the next flow run of the same deployment to start executing even if it is scheduled to run, we would prefer to have the next run cancelled. Is there a setting for this in Prefect 2.x. Using Queues to control this concurrency will require us to create a queue per deployment, which is not something we want since we have many deployments and creating a new queue for each deployment is going to make for a very complex system. Is there another way we are missing? 2. Prefect 2 schedules next 100 runs of deployment when it is scheduled. Is there a setting we can use to reduce this number?
p

Paul-Etienne Coisne

09/15/2022, 12:08 PM
Hi, I’m completely new to Prefect (since yesterday!) but regarding #1 couldn’t you have an early return that’d look for the previous flow_run and if the status is running then reschedule the current run?
b

Britt Evans

09/15/2022, 4:01 PM
I’ve been using this at the start of my flows (version 2.0b9) to check if they’re already running from the same deployment
async def is_already_running(flow_ctx: FlowRunContext) -> bool:
    """Return true if the given flow run context has a deployment already running in RUNNING state"""
    if not flow_ctx.flow_run.deployment_id:
        return False
    runs = await flow_ctx.client.read_flow_runs(
        flow_run_filter=FlowRunFilter(
            id=FlowRunFilterId(not_any_=[flow_ctx.flow_run.id]),
            deployment_id=FlowRunFilterDeploymentId(
                any_=[flow_ctx.flow_run.deployment_id]
            ),
            state=FlowRunFilterState(
                type=FlowRunFilterStateType(any_=[StateType.RUNNING])
            ),
        )
    )
    n_flows = len(runs)
    return n_flows > 0
Usage:
from prefect import flow, get_run_logger, context

@flow(name="My Flow")
async def my_flow():
    logger = get_run_logger()
    if await is_already_running(context.get_run_context()):
        <http://logger.info|logger.info>("Skipping, already running")
        return
s

Surat Mukker

09/16/2022, 5:43 AM
Thanks @Paul-Etienne Coisne and @Britt Evans. We are hoping for handling this in the orchestration layer to avoid having this code block repeated across all flows. Yes it's only 4 lines but it is repeating code that if handled in orchestration layer will be in a single file.
c

Christopher Boyd

09/19/2022, 2:40 PM
Hi Surat, If I understand you right, you want to limit flow RUNS when a flow run of a given flow is currently executing. If that’s the case, that functionality would need to be introduced as suggested above. There are some various configuration options available to you, such as concurrency, work-queues, and pausing and resuming flows. That said, the agents are intended for work-queues and deployments / flows. Any flow run that is created, is created as a sub-run or element of the flow
All that to say, that functionality does not currently exist as I think you would ideally like to use it (if flowRun of flow is executing, don’t run anything else)
s

Surat Mukker

09/19/2022, 7:28 PM
Thanks @Christopher Boyd. We found an open request for this feature: https://github.com/PrefectHQ/prefect/issues/5623
🙌 1
Found the answer to the question 2 about limited # of scheduled runs for a deployment here: https://github.com/PrefectHQ/prefect/blob/main/docs/concepts/schedules.md