<@ULVA73B9P> this is the prefect.yaml configuratio...
# marvin-ai
k
@Marvin this is the prefect.yaml configuration for one of my deployments - name: '[Tarinika-Inc(US Store)]: Clickstream Aggregation' version: null tags: ['clickstream-aggregation', 'tarinika-inc', 'prod'] concurrency_limit: null description: |- This runs the dbt aggregation for all the clickstream models Args: database: Database name models: Specific models to run (default: 'constants') full_refresh: Whether to do full refresh (default: True) store_timezone_offset_minutes: Timezone offset (default: "330") entrypoint: flows/clickstream.py:clickstream_dbt_run parameters: database: "tarinika-inc" full_refresh: false store_timezone_offset_minutes: "-300" work_pool: name: local-work-pool work_queue_name: null job_variables: {} schedules: - interval: 1800.0 anchor_date: '2025-09-09T080620.631532+00:00' timezone: UTC active: false here i have defined the schedule as interval of 30 mins , now i want to know and then modify this configuration as i want the runs for this deployment as unless one run is not completed even if the next run schedule time has come it should not execute, i think this can be achieved by setting the concurrency limit as 1, just want to know what will happen to the schedule runs then and how can i implement this my idea is simple , for this deployment run the flow with interval of 30 mins, but in case if any of the flow doesnt get completed under 30 mins , the next scheduled run should not be executed and at a time only run flow should be actively running which could be achieved using the concurrency limit i belive, also explain me the nuanaces in this if any
m
thought for 919.1 seconds
Great question — you’re thinking about this the right way. Short answer - Yes: set a deployment-level concurrency limit of 1. - Then pick what should happen when a new scheduled run collides with an active run: - ENQUEUE (default): queue the new run to wait its turn - CANCEL_NEW: immediately cancel the new run (i.e., “skip” overlapping schedules) What happens to scheduled runs - With
limit: 1
and `collision_strategy: ENQUEUE`: - New scheduled runs are created on time but will sit in an
AwaitingConcurrencySlot
state until the current run finishes. They may show as “Late” if the scheduled time is in the past, then start as soon as a slot opens. - Only one run will be actively running at any time. If your flow often runs longer than 30 minutes, you’ll build up a backlog that executes serially. - With
limit: 1
and `collision_strategy: CANCEL_NEW`: - Any run scheduled while another is still running is immediately cancelled. This prevents backlog and enforces “drop overlapping windows.” How to implement in your prefect.yaml Add a concurrency_limit block to the deployment and choose your strategy. To queue overlapping runs (single-flight with backlog):
Copy code
- name: '[Tarinika-Inc(US Store)]: Clickstream Aggregation'
  version: null
  tags: ['clickstream-aggregation', 'tarinika-inc', 'prod']
  concurrency_limit:
    limit: 1
    collision_strategy: ENQUEUE
  description: |-
    This runs the dbt aggregation for all the clickstream models
    Args:
        database: Database name
        models: Specific models to run (default: 'constants')
        full_refresh: Whether to do full refresh (default: True)
        store_timezone_offset_minutes: Timezone offset (default: "330")
  entrypoint: flows/clickstream.py:clickstream_dbt_run
  parameters:
    database: "tarinika-inc"
    full_refresh: false
    store_timezone_offset_minutes: "-300"
  work_pool:
    name: local-work-pool
    work_queue_name: null
    job_variables: {}
  schedules:
  - interval: 1800.0
    anchor_date: '2025-09-09T08:06:20.631532+00:00'
    timezone: UTC
    active: true
To skip overlapping runs instead:
Copy code
concurrency_limit:
    limit: 1
    collision_strategy: CANCEL_NEW
A couple of nuances to be aware of - Your schedule is currently `active: false`; set it to
true
to enable the schedule. - Backlog behavior (ENQUEUE): if a run takes longer than 30 minutes, the next scheduled run will wait; if this keeps happening, you’ll process runs serially and catch up over time. This is expected. - Skipping behavior (CANCEL_NEW): scheduled runs that collide are cancelled and will not run unless manually triggered later. - Work pool concurrency limits are separate. If you also set a work pool limit (e.g.,
prefect work-pool set-concurrency-limit
) it will constrain all deployments in that pool. For per-deployment single-flight, prefer the deployment-level
concurrency_limit
. - Manual runs are also subject to the deployment’s concurrency limit. - If a run gets stuck, queued runs won’t start until you cancel/resolve the stuck run. Deploy your change - After updating
prefect.yaml
, redeploy from your project root: -
prefect deploy
(we verified this command exists) Docs for reference - Prefect YAML (deployment fields, including concurrency): prefect.yaml guide - States and scheduling behavior (Scheduled, Late, AwaitingConcurrencySlot): States, Schedules - Work pools and their concurrency: Work pools If you share whether you want backlog (queue) or skip behavior, I can tailor the exact config and any monitoring tips for your case.
k
@Marvin i dont think this is the right way to wrte the configuration concurrency_limit: limit: 1 collision_strategy: 'CANCEL_NEW' as looking at the deployment class i can see that it defines the concurrency limiting in this manner
Copy code
# concurrency limiting
    concurrency_limit: int | None = None
    concurrency_options: ConcurrencyOptions(collision_strategy=Literal['ENQUEUE', 'CANCEL_NEW']) | None = None
m
thought for 232.4 seconds
You’re right about what you’re seeing in the server model: under the hood, deployments have two separate fields —
concurrency_limit
(int) and
concurrency_options
(with
collision_strategy
). However, in prefect.yaml the canonical way to express this is a single nested
concurrency_limit
block that the deploy command translates into those two fields for you. So in prefect.yaml, this is the correct shape:
Copy code
concurrency_limit:
  limit: 1
  collision_strategy: ENQUEUE   # or CANCEL_NEW
Authoritative reference: prefect.yaml guide (see the deployment fields table for
concurrency_limit
with
limit
and
collision_strategy
). Putting it all together for your use case - You want single-flight execution: only one run at a time. Set
limit: 1
. - Pick how to handle overlaps: - ENQUEUE (default): the next scheduled run will wait (queued) until the current run finishes. - CANCEL_NEW: the next scheduled run will be cancelled immediately if another run is still active. Updated snippet (queue overlaps so you never run more than one at a time, but don’t drop runs):
Copy code
- name: '[Tarinika-Inc(US Store)]: Clickstream Aggregation'
  version: null
  tags: ['clickstream-aggregation', 'tarinika-inc', 'prod']
  concurrency_limit:
    limit: 1
    collision_strategy: ENQUEUE
  description: |-
    This runs the dbt aggregation for all the clickstream models
    Args:
        database: Database name
        models: Specific models to run (default: 'constants')
        full_refresh: Whether to do full refresh (default: True)
        store_timezone_offset_minutes: Timezone offset (default: "330")
  entrypoint: flows/clickstream.py:clickstream_dbt_run
  parameters:
    database: "tarinika-inc"
    full_refresh: false
    store_timezone_offset_minutes: "-300"
  work_pool:
    name: local-work-pool
    work_queue_name: null
    job_variables: {}
  schedules:
  - interval: 1800.0
    anchor_date: '2025-09-09T08:06:20.631532+00:00'
    timezone: UTC
    active: true
If you’d rather drop overlaps instead of queueing:
Copy code
concurrency_limit:
    limit: 1
    collision_strategy: CANCEL_NEW
What happens to scheduled runs with each strategy - ENQUEUE (recommended for your “don’t start the next until the prior finishes” intent): - New scheduled runs are created on time, then enter an AwaitingConcurrencySlot scheduled state until the current run completes. They may show as “Late” if their scheduled time has passed; they’ll start immediately when the slot opens. - Only one run is active at any time; if runs often take >30 minutes, you’ll build a serial backlog. - CANCEL_NEW: - If a run is already active when the next one is scheduled, the new one is cancelled immediately (skipped). It won’t run unless you manually kick it off later. Nuances to keep in mind - Your schedule was `active: false`; set it to
true
to actually schedule runs. - Backlog vs skip is a product decision: ENQUEUE preserves every interval; CANCEL_NEW enforces “latest-only” behavior. - Work pool concurrency limits (set via CLI) are separate and constrain all deployments in that pool; for single-flight per-deployment, prefer the deployment-level
concurrency_limit
. - Manual runs also respect the deployment’s concurrency limit. - If a run gets stuck in Running, queued runs will not start until you cancel/resolve it. Deploy and verify - From your project root:
prefect deploy
- List and inspect to confirm concurrency settings: -
prefect deployment ls
-
prefect deployment inspect "<flow-name>/<deployment-name>"
More background - Schedules and states: Schedules and States (see Scheduled, Late, AwaitingConcurrencySlot) - Work pools: Work pools
If you tell me whether you prefer to queue or skip overlaps, I can suggest monitoring and alerting tips tailored to that choice.
k
@Marvin can you tell me how can i set the global concurrency control limit so the configuration i have is something like this, i have a dbt project over which i am using the prefect orchestration i have to run the same dbt project over different stores(databases), and thus for each store i have created a new deployment with the same entrypoint which basically executes dbt run for example so i want to control the flows by setting the global concurrency limit, i want that at a time not more than 4 flows should be running, i hope we can achieve this using the global concurrency limit
m
thought for 318.6 seconds
You’ve got two good ways to cap “at most 4 flows running” across your store-specific deployments. Pick based on whether you want a no‑code change or you’re okay modifying the flow. Option A (recommended, no code): limit the work pool - If all these deployments use the same work pool (you showed
local-work-pool
), set a pool-wide concurrency limit. Workers will only start up to N flow runs across all deployments in that pool; the rest will wait. - Command:
Copy code
prefect work-pool set-concurrency-limit "local-work-pool" 4
- Verified CLI:
prefect work-pool set-concurrency-limit
exists. Ref: Work pools How it behaves: - At most 4 flow runs will be active across all your store deployments in that pool. - Additional scheduled/triggered runs will sit in a scheduled state until a worker slot is available. - Combine with per‑deployment
concurrency_limit: { limit: 1, collision_strategy: ENQUEUE }
for “single-flight per store + 4 total globally.” Option B (code-based): Global Concurrency Limits (GCL) - Use a named global limit and have each flow acquire a slot at the start. - Create the limit:
Copy code
prefect gcl create dbt-global --limit 4
- Verified CLI exists:
prefect gcl create
. Docs: GCL CLI and guide: Global concurrency limits - Wrap your flow body to acquire the limit. Use sync or async depending on your flow: Sync flow:
Copy code
from prefect import flow
from prefect.concurrency.sync import concurrency

@flow
def clickstream_dbt_run(...):
    with concurrency("dbt-global", occupy=1, strict=True):
        # run dbt (e.g., dbt CLI) here
Async flow:
Copy code
from prefect import flow
from prefect.concurrency.asyncio import concurrency

@flow
async def clickstream_dbt_run(...):
    async with concurrency("dbt-global", occupy=1, strict=True):
        # run dbt here
Notes for GCL: -
strict=True
ensures the run fails if the named limit is missing (safer than silently not limiting). - If all 4 slots are occupied, new runs will wait until a slot is free (or error if you set
timeout_seconds
). - You can manage/inspect with:
Copy code
prefect gcl ls
prefect gcl inspect dbt-global
prefect gcl update dbt-global --limit 6
Which should you choose? - If your only goal is “no more than 4 flows across all stores” and they all use the same pool, the work pool concurrency limit is the simplest, requires no code, and is easy to reason about. - If you need to coordinate concurrency across multiple pools or want fine-grained, code-level throttling (e.g., only certain sections acquire slots), use GCL. - For “no overlap per store” + “max 4 globally,” you can combine: - Per-deployment:
concurrency_limit: { limit: 1, collision_strategy: ENQUEUE }
- Global cap: either Work Pool limit of 4 (simpler) or a GCL of 4 with the code wrapper. If you tell me whether you prefer no-code (work pool limit) or code-based (GCL), I can provide the exact final snippets/commands for your repo and pool names.