ASHIK
08/11/2025, 4:11 PMMarvin
08/11/2025, 4:11 PMMarvin
08/11/2025, 4:14 PMprefect concurrency-limit create inner-task 5
Code:
from prefect import flow, task, tags
@task(tags=["inner-task"])
def inner(x):
# ... do the thing ...
return x
@flow
def parent(items):
# parent can submit as many as it wants; the tag enforces 5 at a time across the workspace
futures = [inner.submit(i) for i in items]
return [f.result() for f in futures]
Option B — Global Concurrency Limits (GCL) from code
- Scope: fine-grained named “resources” you acquire in code; useful inside a task or subflow to gate critical sections.
- Create the limit, then use prefect.concurrency.*
to occupy slots.
CLI (verified):
prefect global-concurrency-limit create io-db --limit 5
Async task:
from prefect import task
from prefect.concurrency.asyncio import concurrency
@task
async def inner(x):
async with concurrency(names="io-db", occupy=1, timeout_seconds=60):
# only 5 inners across the workspace can be in here at once
# ... do async work ...
return x
Sync task:
from prefect import task
from prefect.concurrency.sync import concurrency
@task
def inner(x):
with concurrency(names="io-db", occupy=1, timeout_seconds=60):
# ... do sync work ...
return x
Option C — Batch processing inside a subflow (local cap)
- Scope: limit within a single flow run by submitting in chunks; combine with a task runner to cap local parallelism.
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.utilities.collections import batched_iterable
@task
def inner(x):
# ... work ...
return x
@flow(task_runner=ThreadPoolTaskRunner(max_workers=10)) # local parallelism cap
def process_in_batches(items, batch_size=50):
results = []
for batch in batched_iterable(items, batch_size):
futs = [inner.submit(i) for i in batch] # submit a batch
results.extend([f.result() for f in futs]) # wait for the batch
return results
Optional — Rate limiting
If you need “per second” pacing instead of simultaneous slot caps, create a GCL with slot decay and call rate_limit
inside your task before each unit of work.
Docs and references
- Global concurrency limits: Docs: Global Concurrency Limits
- Background on scaling/mapping: Blog: Mapping at scale
A couple of quick clarifiers so I can point you to the simplest setup:
- Do you want the limit to apply across all flows in your workspace, or just within a single run?
- Are your “inner” tasks async or sync?
- When you say “nested tasks,” is the fan-out currently happening inside a task, or in a subflow? If it’s inside a task, I’ll show you how to move that orchestration into a subflow cleanly.
I know, concurrency always wants to be infinite until your database cries. We can keep it civil.ASHIK
08/11/2025, 4:17 PMMarvin
08/11/2025, 4:17 PMMarvin
08/11/2025, 4:19 PMprefect concurrency-limit create field-tasks 50
Code:
from prefect import flow, task
from prefect.futures import as_completed
@task(tags=["field-tasks"], cache_result_in_memory=False, persist_result=False)
def process_field(field_id):
# ... do the work ...
return None # or a small summary if you must keep something
@flow
def process_farm(farm):
# Submit all fields, but only 50 will actually run at once due to the tag limit
futures = [process_field.submit(fid) for fid in farm.fields]
# Stream results to release memory as tasks finish
for fut in as_completed(futures):
_ = fut.result() # handle minimal output if needed
# drop references quickly; don’t accumulate large lists
@flow
def process_account(farms):
# run farms however you like (parallel or not)
for farm in farms:
process_farm.submit(farm)
Notes for memory:
- cache_result_in_memory=False
prevents Prefect from retaining large results in the worker’s memory.
- persist_result=False
avoids writing results to storage if you don’t need them later.
- Stream with as_completed(...)
so you don’t hold a giant list of in-flight results.
2) Per-run local cap + batching (keeps memory tight within each farm subflow)
- If you only want at most 50 in-flight per farm subflow (not across the whole workspace), submit in batches and/or cap the task runner.
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.utilities.collections import batched_iterable
from prefect.futures import as_completed
@task(cache_result_in_memory=False, persist_result=False)
def process_field(field_id):
# ... work ...
return None
@flow(task_runner=ThreadPoolTaskRunner(max_workers=50))
def process_farm_in_batches(farm, batch_size=50):
for batch in batched_iterable(farm.fields, batch_size):
futs = [process_field.submit(fid) for fid in batch]
for f in as_completed(futs):
_ = f.result()
Which to choose?
- Want a hard “no more than 50 field tasks running anywhere” rule? Use option 1 (tag concurrency limit).
- Want each farm subflow to run up to 50-at-a-time independently? Use option 2 (batching + task runner).
- If you want both global control and tight local memory, combine them: tag limit at 50 and batch per farm. The tag ensures a hard global cap; batching keeps each subflow’s in-flight futures reasonable.
Docs for reference:
- Global concurrency concepts (also useful background): Global Concurrency Limits
Quick clarifier so I can tailor this exactly:
- Do you need the “50” limit across all farms combined, or 50 per farm subflow?
- Are field tasks producing large results you don’t need to keep? If so, we can aggressively disable result persistence and memory caching as shown.ASHIK
08/11/2025, 4:20 PMMarvin
08/11/2025, 4:20 PMMarvin
08/11/2025, 4:21 PM