Martin Janiczek
10/10/2025, 8:45 AMasync with get_client() as client:
await client.create_concurrency_limit(concurrency_limit_tag, 20)
client_page_numbers = range(0, ceil(client_invoices_count / pagination_limit))
result_futures = []
for page in client_page_numbers:
result_futures.append(get_client_invoice_page.submit(page))
wait(result_futures)
results = [future.result() for future in result_futures]
Any ideas why the pauses are there?Marvin
10/10/2025, 8:45 AMMarvin
10/10/2025, 8:53 AM@task(tags=[concurrency_limit_tag])
- In the UI, do the “paused” tasks show state like AwaitingConcurrency or Scheduled far in the future?
- Are you on Prefect Cloud or OSS Server? If OSS, which server version?
- Are you creating the concurrency limit on every flow run (as in your snippet), or did you create it once out-of-band?
What commonly causes long pauses with GCL
- Leaked or stuck slots: If tasks crash/cancel or the process is interrupted, “active slots” can stay non-zero. When that happens, new tasks remain in AwaitingConcurrency until slots decay or are reset — which can look like multi-minute gaps.
- Slot decay is slow: If slot_decay_per_second
is very low, it can take a long time to replenish slots after a spike or a crash. This can show up as 10–30 minute “batchy” behavior as the acquisition service cycles and tokens slowly return.
- Creating the GCL inside the flow each run: This can unexpectedly reset parameters (like slot_decay_per_second
) or the active slot count if there’s a race with other runs, leading to odd acquisition behavior.
Quick checks and fixes
1) Inspect your concurrency limit
- Check its current active slots and decay rate:
prefect gcl inspect <your-limit-name>
If you see active_slots
near your limit (e.g., ~20) even when no tasks are running, they’ve likely leaked. Reset with:
prefect gcl update <your-limit-name> --active-slots 0
Optionally, set a faster decay to recover from leaks more gracefully:
prefect gcl update <your-limit-name> --slot-decay-per-second 1
Docs: Global concurrency limits
2) Ensure tasks are actually using that limit
Your task must be tagged with the concurrency limit name:
from prefect import task
@task(tags=["client-invoices"]) # must match the GCL name you created
def get_client_invoice_page(page): ...
And create the limit once (ideally outside the flow):
prefect gcl create client-invoices --limit 20
3) Don’t create the GCL inside the flow
Move this out of your flow:
async with get_client() as client:
await client.create_concurrency_limit(concurrency_limit_tag, 20)
Create/update it once via CLI or a separate management script so you don’t inadvertently reset parameters or race with other runs.
4) If you only need per-flow concurrency (not cross-flow/global)
Skip GCL entirely and use a task runner to cap concurrency without orchestration:
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
@flow(task_runner=ThreadPoolTaskRunner(max_workers=20))
def my_flow(...):
futures = [get_client_invoice_page.submit(p) for p in pages]
# wait and collect as you are now
This removes the global service and eliminates the “AwaitingConcurrency” pauses.
5) Quick A/B: temporarily disable the GCL
Disabling the limit will confirm if the pauses are due to the concurrency system:
prefect gcl disable <your-limit-name>
Re-run the flow; if the pauses disappear, we’ve isolated the cause to the GCL.
Why this may appear after 3.4.22
I couldn’t find a documented regression between 3.4.11 and 3.4.22 that changes acquisition timing. The most common real-world cause is either:
- Slot leakage (active slots not released), plus slow/zero slot decay
- Re-creating the GCL inside the flow which unintentionally alters its state across runs
If you can share:
- The task decorator (showing tags)
- A screenshot/snippet of a paused task’s state history (e.g., AwaitingConcurrency with next scheduled time)
- Output of prefect gcl inspect <name>
Marvin
10/10/2025, 8:53 AMprefect gcl --help
- prefect gcl inspect --help
- prefect gcl create --help
If it turns out to be a reproducible regression, I’ll help craft a minimal repro we can escalate.Martin Janiczek
10/10/2025, 9:48 AMcreate
call inside the flow, and I've set the slot decay via UI (the Python API doesn't seem to support this)
I've tried to use rate_limit()
but that doesn't seem to do anything.
ThreadPoolTaskRunner seems to have fixed the issue though, so thank you, @Marvin!Marvin
10/10/2025, 9:48 AMMarvin
10/10/2025, 9:52 AMprefect gcl update <your-limit-name> --slot-decay-per-second 1
Docs: Global concurrency limits
- Why ThreadPoolTaskRunner “fixes” it: it enforces local concurrency within the flow process, avoiding the global orchestration queue for concurrency slots. That eliminates the “AwaitingConcurrency” pauses from GCLs and any slot leakage/decay quirks. If you intend to rely only on the task runner, make sure your task isn’t tagged with the GCL name anymore; otherwise the global limit will still gate starts.
About rate_limit()
- It’s an async context manager that acquires from named limits:
from prefect.concurrency.asyncio import rate_limit
@task
async def do_work(i):
async with rate_limit("client-invoices", timeout_seconds=300):
# your API call or work here
- For it to “do something” across processes/flows, there must be a matching Global Concurrency Limit configured (same name), with a limit and a slot_decay_per_second that expresses the rate you want. A common setup:
- limit = the max burst you’ll allow
- slot_decay_per_second = your sustained rate (tokens per second)
If the limit doesn’t exist or names don’t match, it’ll appear to have no effect.
- If your task is synchronous, use the sync variant:
from prefect.concurrency.sync import rate_limit
@task
def do_work(i):
with rate_limit("client-invoices", timeout_seconds=300):
...
Given ThreadPoolTaskRunner fixed your pauses, you may not need GCL/rate limiting unless you’re coordinating throughput across multiple flows or workers. If you do want a cross-flow RPS cap, keep the GCL and wrap the critical section with rate_limit()
as above.
If you want, share your current task decorator (tags) and GCL config and I can sanity check the setup.