Victor Longui
08/14/2025, 8:13 PMMarvin
08/14/2025, 8:13 PMMarvin
08/14/2025, 8:23 PMprefect concurrency-limit create limit-concurrency-one 1
creates a tag-based limit that applies to task runs. The CLI even says so:
This limit controls how many task runs with that tag may simultaneously be in a Running state.
You added the tag to the deployment (which propagates to the flow run), but tag-based limits in 2.19 do not stop flow runs from starting; they control tasks within runs.
- Even where concurrency limits do apply, they put runs into AwaitingConcurrencySlot until a slot frees up. They do not auto-cancel the newcomer.
What to use instead (pick one)
1) Prevent overlap by queueing (simple, OSS-compatible)
- Give this deployment a dedicated work queue (or pool) and set its concurrency to 1. Only one flow run from that queue will be handed to a worker at a time.
- Create and limit the queue:
prefect work-queue create serial-long-running-test
prefect work-queue set-concurrency-limit serial-long-running-test 1
- Point your deployment at that queue (set work_queue_name
on the deployment).
- Docs: https://docs-2.prefect.io/latest/concepts/work-pools/
2) Cancel the new run automatically (Prefect Cloud)
- Use an Automation that cancels new runs when another run of the same deployment is Pending/Running.
- Create a metric-based automation that counts flow runs for this deployment in states [Pending, Running] > 1 and action = CancelFlowRun on the newest run.
- Docs: https://docs-2.prefect.io/latest/concepts/automations/
3) Cancel in-code at flow start (OSS or Cloud)
- Add a guard at the beginning of the flow that either
a) Tries to acquire a named concurrency slot immediately (timeout 0) and cancels if unavailable, or
b) Queries for other runs of the same deployment in Pending/Running and cancels if found.
Example using a concurrency slot (fast and simple):
from prefect import flow, get_run_logger
from prefect.concurrency.sync import concurrency
from prefect.runtime import flow_run
from prefect.client import get_client
from prefect.states import Cancelled
import anyio
@flow
def long_running_flow():
logger = get_run_logger()
try:
# Requires: prefect concurrency-limit create limit-concurrency-one 1
with concurrency("limit-concurrency-one", timeout_seconds=0, strict=True):
# do the real work here
...
except Exception:
<http://logger.info|logger.info>("Another run is active; cancelling this run.")
async def _cancel():
async with get_client() as client:
await client.set_flow_run_state(flow_run.id, Cancelled())
anyio.run(_cancel)
return
If you prefer a query-based guard instead of a slot, we can share that snippet too.
References (just in case you enjoy reading as much as I do)
- Concurrency limits (2.x): https://docs-2.prefect.io/latest/guides/global-concurrency-limits/
- Task tags/concurrency: https://docs-2.prefect.io/latest/concepts/tasks/#configuring-concurrency-limits
- Automations (Cloud): https://docs-2.prefect.io/latest/concepts/automations/
A couple of clarifications to tailor the fix:
- Are you on Prefect Cloud or OSS?
- Do you want the second run to be cancelled or just queued?
- Can this deployment use a dedicated work queue/pool? If yes, the “concurrency=1 queue” route is the simplest.