<@ULVA73B9P> when running a flow in prefect with a...
# ask-marvin
o
@Marvin when running a flow in prefect with async tasks, do they respect concurrency limits (eg you have a limit of 5 tasks for a given tag, and schedule 10 async tasks - do they keep it at a limit of 5 and add inn new ones once one complete?).
m
thought for 215.5 seconds
Short answer: yes. In Prefect 3.x, tag-based concurrency limits are enforced by the server when a task run tries to enter a Running state. If you set a limit of 5 on a tag and submit 10 Prefect tasks with that tag, only 5 will acquire slots and start; the rest will wait and be retried until a slot frees, at which point new ones will start. This applies whether the task function is async or sync—the async nature doesn’t bypass the limit. Key points and caveats: - Limits are global across your Prefect server/Cloud workspace: they cap the number of Running task runs with that tag across all flow runs and workers. - The limit applies only to Prefect task runs (i.e., functions decorated with
@task
). Pure asyncio coroutines you launch yourself (not Prefect tasks) are not subject to Prefect’s concurrency limits. - Your worker’s capacity/task runner still matters; the tag-based limit is an upper bound. The worker won’t exceed its own parallelism even if slots are available. Example 1) Create the concurrency limit:
Copy code
prefect concurrency-limit create limited 5
2) Tag your task and submit 10 of them:
Copy code
from prefect import flow, task
import asyncio

@task(tags={"limited"})
async def do_work(i: int) -> int:
    await asyncio.sleep(1)
    return i

@flow
def my_flow():
    futures = [do_work.submit(i) for i in range(10)]
    return [f.result() for f in futures]
Only 5
do_work
task runs will be Running at once; as one finishes, another queued task run will start. Inspect active slots:
Copy code
prefect concurrency-limit inspect limited
Docs: Tag-based concurrency limits