Hi <#CL09KU1K7|>, I'm running a self-hosted server...
# ask-community
r
Hi #CL09KU1K7, I'm running a self-hosted server with PostgreSQL and Redis, testing starting a flow which tries to schedule tens of thousands of tasks concurrently as we want to evaluate using Prefect for a type of workflow which will run hundreds of thousands (up to millions) of tasks. One of those tasks in our use-case would have to have a concurrency limit as it queries an external system. I've set
PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS=1
. In my test flow, if I add a concurrency limit on one task (tag-based), I notice the execution time of the flow increasing significantly. I'm testing it on a flow where I use asyncio to start many executions of task1 concurrently (I'm using the default task runner), and task1 just calls task2 which has a tag with a concurrency limit. Even if I set the number of slots in the concurrency limit to be higher than the number of tasks (e.g. 1000 tasks, 1001 slots), I see a significant runtime increase (from 7s to 33s, in my case). I assume this increased latency is due to database round trips and lock contention on acquiring slots, is this correct? In this test, we're not even waiting on slots to become free - there are always free slots (I set the slot value this high to verify that we're seeing significant latency despite not actually waiting for tasks to free up slots, as this was my suspicion). For e.g. 50k tasks, the execution time goes from 5min to 30min. What I'm wondering is if there is any way to optimize this so we don't see such major increases? Can I tune my database?
b
@Robyn H which version of Prefect do you have deployed?
r
@Brendan Dalpe 3.4.6. By the way - I've also implemented the same flow using an asyncio Semaphore which is significantly more performant, so this could be a workaround - but I would like to use the Prefect concurrency control ideally.
b
I'm wondering if you're running into this issue: https://github.com/PrefectHQ/prefect/issues/18654
j
Is this really possible with Prefect? From my testing, concurrency limits and
@task
all fall apart if you are launching thousands of short-lived tasks all at once. The backend spends more time housekeeping than actually running the tasks (also the UI grinds to a halt trying to render them). I would love to be wrong about this! (My testing was all self-hosted in GKE + Postgres)
r
@Brendan Dalpe I upgraded Prefect to 3.4.12 but there was no improvement, also we saw this behavior before starting to use Redis. My flow is running in a single process so we wouldn't expect large latency increases when adding a concurrency limit - each coroutine should be able to acquire a slot and then continue execution. Is this a constant overhead just from querying the concurrency limit? My experience unfortunately mirrors yours @Joe Mesterhazy, as soon as I start running thousands of tasks
j
@Brendan Dalpe have you tried switching from concurrency limits to rate limits. There a pretty big improvement in speed when I did that.
b
Ok, I understand a little bit more. Setting
PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS
is going to be really disadvantageous here because only one of your tasks needs concurrency. Setting this env means every task will query the concurrency limits. I agree with @Joe Mesterhazy here about taking a look at using
concurrency
or
rate_limit
to accomplish your requirement around throttling the single task: https://docs.prefect.io/v3/how-to-guides/workflows/global-concurrency-limits#using-rate-limit As an example, this would rate limit only inside the specific task:
Copy code
from prefect import flow, task
from prefect.concurrency.sync import rate_limit

@task
def make_http_request():
    rate_limit("rate-limited-api")
    print("Making an HTTP request...")

@flow
def my_flow():
    for _ in range(10):
        make_http_request.submit()
Trying to load 10k+ tasks in the UI is not going to work well... Is there a reason outside the original Task concurrency limit that everything needs to be tagged as a task?
r
@Brendan Dalpe Thank you for your help! I can try it out with rate limits, and will report back. but I'm confused as to why setting
PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS
to 1, vs. the default value of 30, would suddenly make all tasks check concurrency limits. I'm only setting a tag on one task, following the documentation here. The expected behavior is that this only affects the scheduling of that one task. - and when I remove the tag for that task, there is clearly no more concurrency control for that one (my env variable is always set to 1 for the slot wait). I've already also tried this with
concurrency
and saw the same issue (major latency increase even when there is a large number of slots).
j
It sounds like you are trying control concurrency for a single task, on a single worker? What I would do is instead decorate the function with a tenacity @retry decorator, and use asyncio and a semaphore to control concurrency (if its async) or a simple thread pool if its not async (the calling function managing all this would be the task). If the task is called by simultaneously by multiple workers, then the rate limit would be the best option.
r
Thank you Joe! To be clear, I'm testing it on a single worker as I see even there a large latency increase when using concurrency control, but the goal is to use multiple workers. So I'll try the rate limit to see if there is any improvement