Fredrik Hoem Grelland
09/26/2025, 12:44 PMMarvin
09/26/2025, 12:44 PMMarvin
09/26/2025, 12:47 PMfrom prefect import flow, task
@task(tags=["etl-io"])
def fetch_one(i):
...
@flow
def parent(items):
fetch_one.map(items) # mapped children will honor the tag limit
Then set the limit via CLI:
prefect concurrency-limit create etl-io 5
prefect concurrency-limit ls
prefect concurrency-limit inspect etl-io
Docs: Tag-based concurrency limits
2) Limit the flow’s task runner concurrency (simple, code-only)
- Caps concurrent task runs within a single flow run process (affects all tasks in that flow run)
Example:
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
@task
def fetch_one(i):
...
@flow(task_runner=ThreadPoolTaskRunner(max_workers=5))
def parent(items):
fetch_one.map(items)
Notes and tips:
- Use tags if you want a per-task-type cap (and to enforce limits across flows/workers). Use the task runner if you just want a local cap within one flow run.
- Work pool/queue concurrency limits affect how many flow runs get picked up, not how many mapped children run inside a flow, so they won’t solve mapped task concurrency.
- If you prefer a code-only throttle without tags, another quick trick is batching:
from itertools import islice
def batched(iterable, n):
it = iter(iterable)
while True:
chunk = list(islice(it, n))
if not chunk:
return
yield chunk
for chunk in batched(items, 5):
fetch_one.map(chunk)
Would you like this limit to apply just within a single flow run or across all runs of this task? And are you running on Prefect Cloud/Server? I can tailor the recommendation exactly to your setup.