https://prefect.io logo
d

Deceivious

08/21/2023, 8:59 AM
Hi Everyone. I found a weird bottleneck in Prefect - related to taask concurrency tags and mapped tasks. I believe this might be a bug or an oversight. [MRE in thread] When a task in mapped tasks limited by concurrency tag limit completes, it takes 10+ seconds for the next one to start. Is there any reason behind this delay?
1
Copy code
import prefect
from prefect import flow,task
import time

@flow
def main():
    coro=[i for i in range(0,9)]
    results=t1.map(inp=coro)
    for i in results:
        i.wait()
    prefect.get_run_logger().info(results)


@task(tags=["3_LIMIT"]) # limit set to 3
def t1(inp):
    time.sleep(1)
    inp=inp*10
    return inp


if __name__=="__main__":
    main()
j

Jake Kaplan

08/21/2023, 2:19 PM
Hey, this is because of how the v1 version of concurrency is implemented. When a task run cannot secure a concurrency slot from the server, it instructs the client to wait 30s before trying to secure a slot again. If you run your flow in debug mode should see some logs like:
Copy code
10:17:30.629 | DEBUG   | prefect.engine - Received wait instruction for 30s: Concurrency limit for the 3_LIMIT tag has been reached
10:17:30.631 | DEBUG   | prefect.engine - Received wait instruction for 30s: Concurrency limit for the 3_LIMIT tag has been reached
10:17:30.632 | DEBUG   | prefect.engine - Received wait instruction for 30s: Concurrency limit for the 3_LIMIT tag has been reached
🙏 1
It might be reasonable to make that a configurable api side parameter, although experience will likely vary depending on your number of tasks + what you set it etc.
d

Deceivious

08/21/2023, 2:21 PM
But the task knows its completed. I dont see why it cant clear its "lock" at the end?
Having a hard wait kinda seems unnecessary in this case.
j

Jake Kaplan

08/21/2023, 2:21 PM
The lock is cleared like you described server side, but the local orchestration context has been told "try again in 30s"
d

Deceivious

08/21/2023, 2:22 PM
Ah just looked at the code. WHy is it 30 seconds?
seems like 30 seconds is a long time
I was optimizing my code as a flow took 50 seconds to execute when tasks are done sequentially. After mapping its 3 mins + now :x
j

Jake Kaplan

08/21/2023, 2:25 PM
I believe it was chosen as an initial reasonable value. Like I said above, I don't think it's unreasonable to make it a setting on the server
d

Deceivious

08/21/2023, 2:25 PM
Always use OSS, so u can break the database 😄
@Jake Kaplan that sounds good. Any reason it was not chosen something lower like 5 seconds?
Its just minor server side processing ++ a db query right?
j

Jake Kaplan

08/21/2023, 2:30 PM
Nothing special about 30s that I know of. I believe it's been that way since prefect2 first released, based off the commit?
d

Deceivious

08/21/2023, 2:32 PM
AH thanks. Can you please push the variable as an configuration? Or bring this up with other devs?
j

Jake Kaplan

08/21/2023, 2:38 PM
Would you mind filing an issue for it? and I can take a look? Additionally I think this level of control is covered by the v2 concurrency implementation
d

Deceivious

08/21/2023, 2:41 PM
V2 I believe has not been released in full capacity right now. I'll create a GitHub issue. Thanks @Jake Kaplan
created one
thank you 1
Any updates on this? @Jake Kaplan
2 Views