Ronald Sam
03/15/2025, 6:44 PMimport asyncio
from prefect import flow, task, get_run_logger
from prefect.concurrency.sync import concurrency
from random import randint, randrange
@task
async def my_coroutine(sem):
async with sem:
# critical section of code
sleeptime = randint(5,15)
print(f"Acquired - {sleeptime} sec")
await asyncio.sleep(sleeptime)
print(f"Released - {sleeptime} sec")
@flow
async def main():
sem = asyncio.Semaphore(3)
tasks = [asyncio.create_task(my_coroutine(sem)) for _ in range(10)]
await asyncio.gather(*tasks)
asyncio.run(main())
Ryan Peden
03/15/2025, 7:32 PMfrom prefect.cache_policies import NO_CACHE
@task(cache_policy=NO_CACHE)
async def my_coroutine(sem):
#rest of your code
Another option is to let Prefect manage the maximum concurrency by giving your task a tag and then seting a concurrency limit on the tag. You'd use the tags
arg in the task decorator to add a tag, something like:
@task(tags=["my_async_task"])
async def my_important_task():
#rest of your code
There are a few ways you can set the concurrency limit on a tag: https://docs.prefect.io/v3/develop/task-run-limits
In this case, you could run something like this in your terminal:
prefect concurrency-limit create my_async_task 3
and then alter your flow and task to get rid of the manual concurrency handling. This might be a more "Prefecty" way to do it - let the orchestrator handle the complexity of concurrency management so your code doesn't need to deal with it.Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by