Ronald Sam
03/15/2025, 6:44 PMimport asyncio
from prefect import flow, task, get_run_logger
from prefect.concurrency.sync import concurrency
from random import randint, randrange
@task
async def my_coroutine(sem):
async with sem:
# critical section of code
sleeptime = randint(5,15)
print(f"Acquired - {sleeptime} sec")
await asyncio.sleep(sleeptime)
print(f"Released - {sleeptime} sec")
@flow
async def main():
sem = asyncio.Semaphore(3)
tasks = [asyncio.create_task(my_coroutine(sem)) for _ in range(10)]
await asyncio.gather(*tasks)
asyncio.run(main())
Ryan Peden
03/15/2025, 7:32 PMfrom prefect.cache_policies import NO_CACHE
@task(cache_policy=NO_CACHE)
async def my_coroutine(sem):
#rest of your code
Another option is to let Prefect manage the maximum concurrency by giving your task a tag and then seting a concurrency limit on the tag. You'd use the tags
arg in the task decorator to add a tag, something like:
@task(tags=["my_async_task"])
async def my_important_task():
#rest of your code
There are a few ways you can set the concurrency limit on a tag: https://docs.prefect.io/v3/develop/task-run-limits
In this case, you could run something like this in your terminal:
prefect concurrency-limit create my_async_task 3
and then alter your flow and task to get rid of the manual concurrency handling. This might be a more "Prefecty" way to do it - let the orchestrator handle the complexity of concurrency management so your code doesn't need to deal with it.