Robyn H
08/11/2025, 3:49 PMPREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS=1
.
In my test flow, if I add a concurrency limit on one task (tag-based), I notice the execution time of the flow increasing significantly. I'm testing it on a flow where I use asyncio to start many executions of task1 concurrently (I'm using the default task runner), and task1 just calls task2 which has a tag with a concurrency limit. Even if I set the number of slots in the concurrency limit to be higher than the number of tasks (e.g. 1000 tasks, 1001 slots), I see a significant runtime increase (from 7s to 33s, in my case). I assume this increased latency is due to database round trips and lock contention on acquiring slots, is this correct?
In this test, we're not even waiting on slots to become free - there are always free slots (I set the slot value this high to verify that we're seeing significant latency despite not actually waiting for tasks to free up slots, as this was my suspicion). For e.g. 50k tasks, the execution time goes from 5min to 30min.
What I'm wondering is if there is any way to optimize this so we don't see such major increases? Can I tune my database?Brendan Dalpe
08/11/2025, 3:55 PMRobyn H
08/11/2025, 3:57 PMBrendan Dalpe
08/11/2025, 4:09 PMJoe Mesterhazy
08/12/2025, 5:18 PM@task
all fall apart if you are launching thousands of short-lived tasks all at once. The backend spends more time housekeeping than actually running the tasks (also the UI grinds to a halt trying to render them). I would love to be wrong about this! (My testing was all self-hosted in GKE + Postgres)Robyn H
08/13/2025, 12:13 PMJoe Mesterhazy
08/13/2025, 4:55 PMBrendan Dalpe
08/13/2025, 7:38 PMPREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS
is going to be really disadvantageous here because only one of your tasks needs concurrency. Setting this env means every task will query the concurrency limits.
I agree with @Joe Mesterhazy here about taking a look at using concurrency
or rate_limit
to accomplish your requirement around throttling the single task: https://docs.prefect.io/v3/how-to-guides/workflows/global-concurrency-limits#using-rate-limit
As an example, this would rate limit only inside the specific task:
from prefect import flow, task
from prefect.concurrency.sync import rate_limit
@task
def make_http_request():
rate_limit("rate-limited-api")
print("Making an HTTP request...")
@flow
def my_flow():
for _ in range(10):
make_http_request.submit()
Brendan Dalpe
08/13/2025, 7:42 PMRobyn H
08/14/2025, 1:16 PMPREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS
to 1, vs. the default value of 30, would suddenly make all tasks check concurrency limits.
I'm only setting a tag on one task, following the documentation here. The expected behavior is that this only affects the scheduling of that one task. - and when I remove the tag for that task, there is clearly no more concurrency control for that one (my env variable is always set to 1 for the slot wait).
I've already also tried this with concurrency
and saw the same issue (major latency increase even when there is a large number of slots).Joe Mesterhazy
08/15/2025, 5:03 PMRobyn H
08/18/2025, 7:24 AM