Shane Breeze
11/06/2023, 8:25 AM2.14.3
I’ve noticed that the global concurrency is blocking tasks longer than needed. I have 1 flow submitting multiple tasks like so:
from prefect import task, flow
from prefect.concurrency.asyncio import concurrency
@task(...)
async def my_task(...):
print(f"Checking for available concurrency limit slot from {tags}")
async with concurrency(tags):
print(f"Acquired concurrency limit slot from {tags}")
...
print(f"Releasing concurrency limit slot from {tags}")
@flow(...)
async def my_flow(task):
for task in tasks:
await task.submit(wait_for=...)
The first set of tasks run as expected as there are slots available in the global concurrency limit. Occasionally when they finish and release their slots the other tasks do not acquire these slots, even though they’re free. How long does concurrency
wait between attempts to acquire a slot?Chris Pickett
11/06/2023, 2:49 PMconcurrency
manager tries to predict when a new slot will be free, based on a moving average of the previous slot acquire/release times. So if the tasks you have are fairly variable in time then it could be that when a fast task completes it might take a while before the manager tries to grab a new slot.Shane Breeze
11/06/2023, 3:15 PMconcurrency(tags, polling=10)
)?Chris Pickett
11/06/2023, 3:27 PMrate_limit
, that wouldn’t change the polling frequency, but it would ensure that slots are released at a regular pace, and the predictive acquisition works better because of that. But if you actually need to make sure that only 1 of these tasks is running at a time then this doesn’t solve your problem.Chris Pickett
11/06/2023, 3:27 PMChris Pickett
11/06/2023, 3:38 PMYing Ting Loo
11/22/2023, 9:38 AM