<@ULVA73B9P> My Prefect flow with many tasks (~500...
# ask-marvin
m
@Marvin My Prefect flow with many tasks (~500) is having weird pauses between their runs (eg. waiting 30min before a task starts). I think this started happening after update from Prefect 3.4.11 to 3.4.22. The flow uses the default task runner and the tasks have a concurrency limit of 20. The relevant part of the code.
Copy code
async with get_client() as client:
        await client.create_concurrency_limit(concurrency_limit_tag, 20)

    client_page_numbers = range(0, ceil(client_invoices_count / pagination_limit))
    result_futures = []
    for page in client_page_numbers:
        result_futures.append(get_client_invoice_page.submit(page))
    wait(result_futures)
    results = [future.result() for future in result_futures]
Any ideas why the pauses are there?
m
thought for 500.6 seconds
Thanks for the details — a few things jump out that can cause exactly the “long pauses” you’re seeing when using global concurrency limits with many submitted tasks. First, a couple of clarifying questions to narrow it down: - Are your tasks tagged with the same name as the concurrency limit? e.g.
@task(tags=[concurrency_limit_tag])
- In the UI, do the “paused” tasks show state like AwaitingConcurrency or Scheduled far in the future? - Are you on Prefect Cloud or OSS Server? If OSS, which server version? - Are you creating the concurrency limit on every flow run (as in your snippet), or did you create it once out-of-band? What commonly causes long pauses with GCL - Leaked or stuck slots: If tasks crash/cancel or the process is interrupted, “active slots” can stay non-zero. When that happens, new tasks remain in AwaitingConcurrency until slots decay or are reset — which can look like multi-minute gaps. - Slot decay is slow: If
slot_decay_per_second
is very low, it can take a long time to replenish slots after a spike or a crash. This can show up as 10–30 minute “batchy” behavior as the acquisition service cycles and tokens slowly return. - Creating the GCL inside the flow each run: This can unexpectedly reset parameters (like
slot_decay_per_second
) or the active slot count if there’s a race with other runs, leading to odd acquisition behavior. Quick checks and fixes 1) Inspect your concurrency limit - Check its current active slots and decay rate:
Copy code
prefect gcl inspect <your-limit-name>
If you see
active_slots
near your limit (e.g., ~20) even when no tasks are running, they’ve likely leaked. Reset with:
Copy code
prefect gcl update <your-limit-name> --active-slots 0
Optionally, set a faster decay to recover from leaks more gracefully:
Copy code
prefect gcl update <your-limit-name> --slot-decay-per-second 1
Docs: Global concurrency limits 2) Ensure tasks are actually using that limit Your task must be tagged with the concurrency limit name:
Copy code
from prefect import task

@task(tags=["client-invoices"])  # must match the GCL name you created
def get_client_invoice_page(page): ...
And create the limit once (ideally outside the flow):
Copy code
prefect gcl create client-invoices --limit 20
3) Don’t create the GCL inside the flow Move this out of your flow:
Copy code
async with get_client() as client:
    await client.create_concurrency_limit(concurrency_limit_tag, 20)
Create/update it once via CLI or a separate management script so you don’t inadvertently reset parameters or race with other runs. 4) If you only need per-flow concurrency (not cross-flow/global) Skip GCL entirely and use a task runner to cap concurrency without orchestration:
Copy code
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner

@flow(task_runner=ThreadPoolTaskRunner(max_workers=20))
def my_flow(...):
    futures = [get_client_invoice_page.submit(p) for p in pages]
    # wait and collect as you are now
This removes the global service and eliminates the “AwaitingConcurrency” pauses. 5) Quick A/B: temporarily disable the GCL Disabling the limit will confirm if the pauses are due to the concurrency system:
Copy code
prefect gcl disable <your-limit-name>
Re-run the flow; if the pauses disappear, we’ve isolated the cause to the GCL. Why this may appear after 3.4.22 I couldn’t find a documented regression between 3.4.11 and 3.4.22 that changes acquisition timing. The most common real-world cause is either: - Slot leakage (active slots not released), plus slow/zero slot decay - Re-creating the GCL inside the flow which unintentionally alters its state across runs If you can share: - The task decorator (showing tags) - A screenshot/snippet of a paused task’s state history (e.g., AwaitingConcurrency with next scheduled time) - Output of
prefect gcl inspect <name>
…I can help pinpoint whether this is slot leakage/decay, tag mismatch, or something else. Reference CLI help we verified: -
prefect gcl --help
-
prefect gcl inspect --help
-
prefect gcl create --help
If it turns out to be a reproducible regression, I’ll help craft a minimal repro we can escalate.
m
I've removed the concurrency limit
create
call inside the flow, and I've set the slot decay via UI (the Python API doesn't seem to support this) I've tried to use
rate_limit()
but that doesn't seem to do anything. ThreadPoolTaskRunner seems to have fixed the issue though, so thank you, @Marvin!
m
thought for 220.2 seconds
Awesome — glad switching to ThreadPoolTaskRunner smoothed it out! A couple of follow-ups to your notes: - You’re right: setting slot decay isn’t exposed in the Python client right now. Use the UI or CLI instead. For CLI:
Copy code
prefect gcl update <your-limit-name> --slot-decay-per-second 1
Docs: Global concurrency limits - Why ThreadPoolTaskRunner “fixes” it: it enforces local concurrency within the flow process, avoiding the global orchestration queue for concurrency slots. That eliminates the “AwaitingConcurrency” pauses from GCLs and any slot leakage/decay quirks. If you intend to rely only on the task runner, make sure your task isn’t tagged with the GCL name anymore; otherwise the global limit will still gate starts. About
rate_limit()
- It’s an async context manager that acquires from named limits:
Copy code
from prefect.concurrency.asyncio import rate_limit

@task
async def do_work(i):
    async with rate_limit("client-invoices", timeout_seconds=300):
        # your API call or work here
- For it to “do something” across processes/flows, there must be a matching Global Concurrency Limit configured (same name), with a limit and a slot_decay_per_second that expresses the rate you want. A common setup: - limit = the max burst you’ll allow - slot_decay_per_second = your sustained rate (tokens per second) If the limit doesn’t exist or names don’t match, it’ll appear to have no effect. - If your task is synchronous, use the sync variant:
Copy code
from prefect.concurrency.sync import rate_limit

@task
def do_work(i):
    with rate_limit("client-invoices", timeout_seconds=300):
        ...
Given ThreadPoolTaskRunner fixed your pauses, you may not need GCL/rate limiting unless you’re coordinating throughput across multiple flows or workers. If you do want a cross-flow RPS cap, keep the GCL and wrap the critical section with
rate_limit()
as above. If you want, share your current task decorator (tags) and GCL config and I can sanity check the setup.