<@ULVA73B9P> I want to use `task.map(obj)` but I ...
# ask-marvin
i
@Marvin I want to use
task.map(obj)
but I don't want Prefect to spin 2000 tasks at the same and it would be ideal if there's a way to limit the concurrency to like 50 parallel task runs at any given moment, is that achievable or I should handle that manually like in batches for example?
m
thought for 114.5 seconds
Great question. Short answer: yes, you can cap parallel task runs to ~50 without manual batching. A couple of options in Prefect 3.x: 1) Limit concurrency within a single flow run - Use a task runner with a fixed worker count. This constrains how many tasks from that flow run execute at once. - Example:
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner

@task
def do_work(x):
    # your logic here
    return x * 2

@flow(task_runner=ThreadPoolTaskRunner(max_workers=50))
def main(items):
    # In Prefect 3.x there is no `task.map`; submit in a list comprehension
    futures = [do_work.submit(i) for i in items]
    return [f.result() for f in futures]

if __name__ == "__main__":
    main(list(range(2000)))
- Submitting 2,000 tasks is fine; only 50 will run concurrently and the rest will queue in the runner. 2) Enforce a workspace-wide cap (across all flows) - Use a concurrency limit tied to a tag. This ensures only N tasks with that tag run simultaneously anywhere in your workspace. - One-time setup:
Copy code
prefect concurrency-limit create limited-50 50
- Tag your task:
Copy code
from prefect import task

@task(tags=["limited-50"])
def do_work(x):
    ...
- Docs: Global concurrency limits Notes and tips - If your work is CPU-bound and you need true parallelism beyond threads, consider a distributed runner (e.g., Dask/Ray) and size the cluster/executors to ~50 workers: - Dask: Prefect Dask integration - Ray: Prefect Ray integration - In Prefect 3.x,
task.map
isn’t available. Use
task.submit
in a comprehension or loop as shown above. - If you want to stream results and avoid materializing all futures at once, you can iterate with
as_completed
while still using a 50-worker runner. Quick check: Are you on Prefect 3.x? If you’re on 1.x/2.x and truly using
task.map
, I can suggest the equivalent pattern for your version, but I’d recommend the 3.x approach above.