<@ULVA73B9P> I am using a .map operator to my task...
# ask-marvin
f
@Marvin I am using a .map operator to my task definition, but now I want to manage how many are running at the same time. how can I do that?
m
thought for 154.6 seconds
Great question! In Prefect 3.x there are a few good ways to cap how many mapped task runs are executing at once. The two most common are: 1) Tag-based concurrency limits (recommended for precise control) - Tag the task you’re mapping - Create a concurrency limit for that tag; extra mapped children will wait in AwaitingConcurrencySlot Example:
Copy code
from prefect import flow, task

@task(tags=["etl-io"])
def fetch_one(i):
    ...

@flow
def parent(items):
    fetch_one.map(items)  # mapped children will honor the tag limit
Then set the limit via CLI:
Copy code
prefect concurrency-limit create etl-io 5
prefect concurrency-limit ls
prefect concurrency-limit inspect etl-io
Docs: Tag-based concurrency limits 2) Limit the flow’s task runner concurrency (simple, code-only) - Caps concurrent task runs within a single flow run process (affects all tasks in that flow run) Example:
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner

@task
def fetch_one(i):
    ...

@flow(task_runner=ThreadPoolTaskRunner(max_workers=5))
def parent(items):
    fetch_one.map(items)
Notes and tips: - Use tags if you want a per-task-type cap (and to enforce limits across flows/workers). Use the task runner if you just want a local cap within one flow run. - Work pool/queue concurrency limits affect how many flow runs get picked up, not how many mapped children run inside a flow, so they won’t solve mapped task concurrency. - If you prefer a code-only throttle without tags, another quick trick is batching:
Copy code
from itertools import islice

def batched(iterable, n):
    it = iter(iterable)
    while True:
        chunk = list(islice(it, n))
        if not chunk:
            return
        yield chunk

for chunk in batched(items, 5):
    fetch_one.map(chunk)
Would you like this limit to apply just within a single flow run or across all runs of this task? And are you running on Prefect Cloud/Server? I can tailor the recommendation exactly to your setup.