Idriss Bellil
08/21/2025, 9:44 AMtask.map(obj)
but I don't want Prefect to spin 2000 tasks at the same and it would be ideal if there's a way to limit the concurrency to like 50 parallel task runs at any given moment, is that achievable or I should handle that manually like in batches for example?Marvin
08/21/2025, 9:44 AMMarvin
08/21/2025, 9:46 AMfrom prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
@task
def do_work(x):
# your logic here
return x * 2
@flow(task_runner=ThreadPoolTaskRunner(max_workers=50))
def main(items):
# In Prefect 3.x there is no `task.map`; submit in a list comprehension
futures = [do_work.submit(i) for i in items]
return [f.result() for f in futures]
if __name__ == "__main__":
main(list(range(2000)))
- Submitting 2,000 tasks is fine; only 50 will run concurrently and the rest will queue in the runner.
2) Enforce a workspace-wide cap (across all flows)
- Use a concurrency limit tied to a tag. This ensures only N tasks with that tag run simultaneously anywhere in your workspace.
- One-time setup:
prefect concurrency-limit create limited-50 50
- Tag your task:
from prefect import task
@task(tags=["limited-50"])
def do_work(x):
...
- Docs: Global concurrency limits
Notes and tips
- If your work is CPU-bound and you need true parallelism beyond threads, consider a distributed runner (e.g., Dask/Ray) and size the cluster/executors to ~50 workers:
- Dask: Prefect Dask integration
- Ray: Prefect Ray integration
- In Prefect 3.x, task.map
isn’t available. Use task.submit
in a comprehension or loop as shown above.
- If you want to stream results and avoid materializing all futures at once, you can iterate with as_completed
while still using a 50-worker runner.
Quick check: Are you on Prefect 3.x? If you’re on 1.x/2.x and truly using task.map
, I can suggest the equivalent pattern for your version, but I’d recommend the 3.x approach above.