<@ULVA73B9P> I’m trying to set up a concurrency li...
# ask-marvin
v
@Marvin I’m trying to set up a concurrency limit for my deployment in Prefect. The idea is simple: If there is already a flow run in Running or Pending state and another flow run is triggered, the new one should be automatically cancelled. I’m using Prefect 2.19 and followed the documentation: https://docs-2.prefect.io/latest/concepts/tasks/#configuring-concurrency-limits Command used: prefect concurrency-limit create limit-concurrency-one 1 I added the limit-concurrency-one tag in the tags field of my deployment. However, even after creating the concurrency limit, I was still able to run two flow runs at the same time. my deployment: - name: long-running-test version: 1 tags: ['test', 'limit-concurrency-one'] description: 'Flow de teste para verificar concurrency limits' entrypoint: scripts/deployments/databricks_loader_schedule/flows/teste_flow.py:long_running_flow work_pool: <<: *work_pool_default job_variables: <<: *job_variables_default cpu_request: 300m memory_request: 400Mi cpu_limit: 600m memory_limit: 1G
m
thought for 604.0 seconds
Short answer: what you configured limits tasks, not flow runs — and concurrency limits don’t cancel runs anyway, they queue them. Hence the two flow runs. Details and fixes: Why your current setup didn’t work -
prefect concurrency-limit create limit-concurrency-one 1
creates a tag-based limit that applies to task runs. The CLI even says so:
Copy code
This limit controls how many task runs with that tag may simultaneously be in a Running state.
You added the tag to the deployment (which propagates to the flow run), but tag-based limits in 2.19 do not stop flow runs from starting; they control tasks within runs. - Even where concurrency limits do apply, they put runs into AwaitingConcurrencySlot until a slot frees up. They do not auto-cancel the newcomer. What to use instead (pick one) 1) Prevent overlap by queueing (simple, OSS-compatible) - Give this deployment a dedicated work queue (or pool) and set its concurrency to 1. Only one flow run from that queue will be handed to a worker at a time. - Create and limit the queue:
Copy code
prefect work-queue create serial-long-running-test
    prefect work-queue set-concurrency-limit serial-long-running-test 1
- Point your deployment at that queue (set
work_queue_name
on the deployment). - Docs: https://docs-2.prefect.io/latest/concepts/work-pools/ 2) Cancel the new run automatically (Prefect Cloud) - Use an Automation that cancels new runs when another run of the same deployment is Pending/Running. - Create a metric-based automation that counts flow runs for this deployment in states [Pending, Running] > 1 and action = CancelFlowRun on the newest run. - Docs: https://docs-2.prefect.io/latest/concepts/automations/ 3) Cancel in-code at flow start (OSS or Cloud) - Add a guard at the beginning of the flow that either a) Tries to acquire a named concurrency slot immediately (timeout 0) and cancels if unavailable, or b) Queries for other runs of the same deployment in Pending/Running and cancels if found. Example using a concurrency slot (fast and simple):
Copy code
from prefect import flow, get_run_logger
from prefect.concurrency.sync import concurrency
from prefect.runtime import flow_run
from prefect.client import get_client
from prefect.states import Cancelled
import anyio

@flow
def long_running_flow():
    logger = get_run_logger()
    try:
        # Requires: prefect concurrency-limit create limit-concurrency-one 1
        with concurrency("limit-concurrency-one", timeout_seconds=0, strict=True):
            # do the real work here
            ...
    except Exception:
        <http://logger.info|logger.info>("Another run is active; cancelling this run.")
        async def _cancel():
            async with get_client() as client:
                await client.set_flow_run_state(flow_run.id, Cancelled())
        anyio.run(_cancel)
        return
If you prefer a query-based guard instead of a slot, we can share that snippet too. References (just in case you enjoy reading as much as I do) - Concurrency limits (2.x): https://docs-2.prefect.io/latest/guides/global-concurrency-limits/ - Task tags/concurrency: https://docs-2.prefect.io/latest/concepts/tasks/#configuring-concurrency-limits - Automations (Cloud): https://docs-2.prefect.io/latest/concepts/automations/ A couple of clarifications to tailor the fix: - Are you on Prefect Cloud or OSS? - Do you want the second run to be cancelled or just queued? - Can this deployment use a dedicated work queue/pool? If yes, the “concurrency=1 queue” route is the simplest.