Hi Everyone. I found a weird bottleneck in Prefec...
# ask-community
d
Hi Everyone. I found a weird bottleneck in Prefect - related to taask concurrency tags and mapped tasks. I believe this might be a bug or an oversight. [MRE in thread] When a task in mapped tasks limited by concurrency tag limit completes, it takes 10+ seconds for the next one to start. Is there any reason behind this delay?
1
Copy code
import prefect
from prefect import flow,task
import time

@flow
def main():
    coro=[i for i in range(0,9)]
    results=t1.map(inp=coro)
    for i in results:
        i.wait()
    prefect.get_run_logger().info(results)


@task(tags=["3_LIMIT"]) # limit set to 3
def t1(inp):
    time.sleep(1)
    inp=inp*10
    return inp


if __name__=="__main__":
    main()
j
Hey, this is because of how the v1 version of concurrency is implemented. When a task run cannot secure a concurrency slot from the server, it instructs the client to wait 30s before trying to secure a slot again. If you run your flow in debug mode should see some logs like:
Copy code
10:17:30.629 | DEBUG   | prefect.engine - Received wait instruction for 30s: Concurrency limit for the 3_LIMIT tag has been reached
10:17:30.631 | DEBUG   | prefect.engine - Received wait instruction for 30s: Concurrency limit for the 3_LIMIT tag has been reached
10:17:30.632 | DEBUG   | prefect.engine - Received wait instruction for 30s: Concurrency limit for the 3_LIMIT tag has been reached
🙏 1
It might be reasonable to make that a configurable api side parameter, although experience will likely vary depending on your number of tasks + what you set it etc.
d
But the task knows its completed. I dont see why it cant clear its "lock" at the end?
Having a hard wait kinda seems unnecessary in this case.
j
The lock is cleared like you described server side, but the local orchestration context has been told "try again in 30s"
d
Ah just looked at the code. WHy is it 30 seconds?
seems like 30 seconds is a long time
I was optimizing my code as a flow took 50 seconds to execute when tasks are done sequentially. After mapping its 3 mins + now :x
j
I believe it was chosen as an initial reasonable value. Like I said above, I don't think it's unreasonable to make it a setting on the server
1
d
Always use OSS, so u can break the database 😄
@Jake Kaplan that sounds good. Any reason it was not chosen something lower like 5 seconds?
Its just minor server side processing ++ a db query right?
j
Nothing special about 30s that I know of. I believe it's been that way since prefect2 first released, based off the commit?
d
AH thanks. Can you please push the variable as an configuration? Or bring this up with other devs?
j
Would you mind filing an issue for it? and I can take a look? Additionally I think this level of control is covered by the v2 concurrency implementation
d
V2 I believe has not been released in full capacity right now. I'll create a GitHub issue. Thanks @Jake Kaplan
created one
thank you 1
Any updates on this? @Jake Kaplan
Reping @Jake Kaplan
j
hey! Sorry for missing your original message, this is still getting prioritized internally, especially because of concurrencyv2 is looking to suplant this, but it is still on the backlog. The original issue is the best place to follow for update: https://github.com/PrefectHQ/prefect/issues/10460