Kartik
11/03/2025, 8:08 AMMarvin
11/03/2025, 8:08 AMMarvin
11/03/2025, 8:23 AMlimit: 1 and `collision_strategy: ENQUEUE`:
- New scheduled runs are created on time but will sit in an AwaitingConcurrencySlot state until the current run finishes. They may show as “Late” if the scheduled time is in the past, then start as soon as a slot opens.
- Only one run will be actively running at any time. If your flow often runs longer than 30 minutes, you’ll build up a backlog that executes serially.
- With limit: 1 and `collision_strategy: CANCEL_NEW`:
- Any run scheduled while another is still running is immediately cancelled. This prevents backlog and enforces “drop overlapping windows.”
How to implement in your prefect.yaml
Add a concurrency_limit block to the deployment and choose your strategy.
To queue overlapping runs (single-flight with backlog):
- name: '[Tarinika-Inc(US Store)]: Clickstream Aggregation'
version: null
tags: ['clickstream-aggregation', 'tarinika-inc', 'prod']
concurrency_limit:
limit: 1
collision_strategy: ENQUEUE
description: |-
This runs the dbt aggregation for all the clickstream models
Args:
database: Database name
models: Specific models to run (default: 'constants')
full_refresh: Whether to do full refresh (default: True)
store_timezone_offset_minutes: Timezone offset (default: "330")
entrypoint: flows/clickstream.py:clickstream_dbt_run
parameters:
database: "tarinika-inc"
full_refresh: false
store_timezone_offset_minutes: "-300"
work_pool:
name: local-work-pool
work_queue_name: null
job_variables: {}
schedules:
- interval: 1800.0
anchor_date: '2025-09-09T08:06:20.631532+00:00'
timezone: UTC
active: true
To skip overlapping runs instead:
concurrency_limit:
limit: 1
collision_strategy: CANCEL_NEW
A couple of nuances to be aware of
- Your schedule is currently `active: false`; set it to true to enable the schedule.
- Backlog behavior (ENQUEUE): if a run takes longer than 30 minutes, the next scheduled run will wait; if this keeps happening, you’ll process runs serially and catch up over time. This is expected.
- Skipping behavior (CANCEL_NEW): scheduled runs that collide are cancelled and will not run unless manually triggered later.
- Work pool concurrency limits are separate. If you also set a work pool limit (e.g., prefect work-pool set-concurrency-limit) it will constrain all deployments in that pool. For per-deployment single-flight, prefer the deployment-level concurrency_limit.
- Manual runs are also subject to the deployment’s concurrency limit.
- If a run gets stuck, queued runs won’t start until you cancel/resolve the stuck run.
Deploy your change
- After updating prefect.yaml, redeploy from your project root:
- prefect deploy (we verified this command exists)
Docs for reference
- Prefect YAML (deployment fields, including concurrency): prefect.yaml guide
- States and scheduling behavior (Scheduled, Late, AwaitingConcurrencySlot): States, Schedules
- Work pools and their concurrency: Work pools
If you share whether you want backlog (queue) or skip behavior, I can tailor the exact config and any monitoring tips for your case.Kartik
11/03/2025, 9:06 AM# concurrency limiting
concurrency_limit: int | None = None
concurrency_options: ConcurrencyOptions(collision_strategy=Literal['ENQUEUE', 'CANCEL_NEW']) | None = NoneMarvin
11/03/2025, 9:06 AMMarvin
11/03/2025, 9:10 AMconcurrency_limit (int) and concurrency_options (with collision_strategy). However, in prefect.yaml the canonical way to express this is a single nested concurrency_limit block that the deploy command translates into those two fields for you.
So in prefect.yaml, this is the correct shape:
concurrency_limit:
limit: 1
collision_strategy: ENQUEUE # or CANCEL_NEW
Authoritative reference: prefect.yaml guide (see the deployment fields table for concurrency_limit with limit and collision_strategy).
Putting it all together for your use case
- You want single-flight execution: only one run at a time. Set limit: 1.
- Pick how to handle overlaps:
- ENQUEUE (default): the next scheduled run will wait (queued) until the current run finishes.
- CANCEL_NEW: the next scheduled run will be cancelled immediately if another run is still active.
Updated snippet (queue overlaps so you never run more than one at a time, but don’t drop runs):
- name: '[Tarinika-Inc(US Store)]: Clickstream Aggregation'
version: null
tags: ['clickstream-aggregation', 'tarinika-inc', 'prod']
concurrency_limit:
limit: 1
collision_strategy: ENQUEUE
description: |-
This runs the dbt aggregation for all the clickstream models
Args:
database: Database name
models: Specific models to run (default: 'constants')
full_refresh: Whether to do full refresh (default: True)
store_timezone_offset_minutes: Timezone offset (default: "330")
entrypoint: flows/clickstream.py:clickstream_dbt_run
parameters:
database: "tarinika-inc"
full_refresh: false
store_timezone_offset_minutes: "-300"
work_pool:
name: local-work-pool
work_queue_name: null
job_variables: {}
schedules:
- interval: 1800.0
anchor_date: '2025-09-09T08:06:20.631532+00:00'
timezone: UTC
active: true
If you’d rather drop overlaps instead of queueing:
concurrency_limit:
limit: 1
collision_strategy: CANCEL_NEW
What happens to scheduled runs with each strategy
- ENQUEUE (recommended for your “don’t start the next until the prior finishes” intent):
- New scheduled runs are created on time, then enter an AwaitingConcurrencySlot scheduled state until the current run completes. They may show as “Late” if their scheduled time has passed; they’ll start immediately when the slot opens.
- Only one run is active at any time; if runs often take >30 minutes, you’ll build a serial backlog.
- CANCEL_NEW:
- If a run is already active when the next one is scheduled, the new one is cancelled immediately (skipped). It won’t run unless you manually kick it off later.
Nuances to keep in mind
- Your schedule was `active: false`; set it to true to actually schedule runs.
- Backlog vs skip is a product decision: ENQUEUE preserves every interval; CANCEL_NEW enforces “latest-only” behavior.
- Work pool concurrency limits (set via CLI) are separate and constrain all deployments in that pool; for single-flight per-deployment, prefer the deployment-level concurrency_limit.
- Manual runs also respect the deployment’s concurrency limit.
- If a run gets stuck in Running, queued runs will not start until you cancel/resolve it.
Deploy and verify
- From your project root: prefect deploy
- List and inspect to confirm concurrency settings:
- prefect deployment ls
- prefect deployment inspect "<flow-name>/<deployment-name>"
More background
- Schedules and states: Schedules and States (see Scheduled, Late, AwaitingConcurrencySlot)
- Work pools: Work poolsMarvin
11/03/2025, 9:10 AMKartik
11/03/2025, 9:34 AMMarvin
11/03/2025, 9:34 AMMarvin
11/03/2025, 9:40 AMlocal-work-pool), set a pool-wide concurrency limit. Workers will only start up to N flow runs across all deployments in that pool; the rest will wait.
- Command:
prefect work-pool set-concurrency-limit "local-work-pool" 4
- Verified CLI: prefect work-pool set-concurrency-limit exists. Ref: Work pools
How it behaves:
- At most 4 flow runs will be active across all your store deployments in that pool.
- Additional scheduled/triggered runs will sit in a scheduled state until a worker slot is available.
- Combine with per‑deployment concurrency_limit: { limit: 1, collision_strategy: ENQUEUE } for “single-flight per store + 4 total globally.”
Option B (code-based): Global Concurrency Limits (GCL)
- Use a named global limit and have each flow acquire a slot at the start.
- Create the limit:
prefect gcl create dbt-global --limit 4
- Verified CLI exists: prefect gcl create. Docs: GCL CLI and guide: Global concurrency limits
- Wrap your flow body to acquire the limit. Use sync or async depending on your flow:
Sync flow:
from prefect import flow
from prefect.concurrency.sync import concurrency
@flow
def clickstream_dbt_run(...):
with concurrency("dbt-global", occupy=1, strict=True):
# run dbt (e.g., dbt CLI) here
Async flow:
from prefect import flow
from prefect.concurrency.asyncio import concurrency
@flow
async def clickstream_dbt_run(...):
async with concurrency("dbt-global", occupy=1, strict=True):
# run dbt here
Notes for GCL:
- strict=True ensures the run fails if the named limit is missing (safer than silently not limiting).
- If all 4 slots are occupied, new runs will wait until a slot is free (or error if you set timeout_seconds).
- You can manage/inspect with:
prefect gcl ls
prefect gcl inspect dbt-global
prefect gcl update dbt-global --limit 6
Which should you choose?
- If your only goal is “no more than 4 flows across all stores” and they all use the same pool, the work pool concurrency limit is the simplest, requires no code, and is easy to reason about.
- If you need to coordinate concurrency across multiple pools or want fine-grained, code-level throttling (e.g., only certain sections acquire slots), use GCL.
- For “no overlap per store” + “max 4 globally,” you can combine:
- Per-deployment: concurrency_limit: { limit: 1, collision_strategy: ENQUEUE }
- Global cap: either Work Pool limit of 4 (simpler) or a GCL of 4 with the code wrapper.
If you tell me whether you prefer no-code (work pool limit) or code-based (GCL), I can provide the exact final snippets/commands for your repo and pool names.