ASHIK
08/11/2025, 4:11 PMMarvin
08/11/2025, 4:11 PMMarvin
08/11/2025, 4:14 PMprefect concurrency-limit create inner-task 5from prefect import flow, task, tags
@task(tags=["inner-task"])
def inner(x):
    # ... do the thing ...
    return x
@flow
def parent(items):
    # parent can submit as many as it wants; the tag enforces 5 at a time across the workspace
    futures = [inner.submit(i) for i in items]
    return [f.result() for f in futures]prefect.concurrency.*prefect global-concurrency-limit create io-db --limit 5from prefect import task
from prefect.concurrency.asyncio import concurrency
@task
async def inner(x):
    async with concurrency(names="io-db", occupy=1, timeout_seconds=60):
        # only 5 inners across the workspace can be in here at once
        # ... do async work ...
        return xfrom prefect import task
from prefect.concurrency.sync import concurrency
@task
def inner(x):
    with concurrency(names="io-db", occupy=1, timeout_seconds=60):
        # ... do sync work ...
        return xfrom prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.utilities.collections import batched_iterable
@task
def inner(x):
    # ... work ...
    return x
@flow(task_runner=ThreadPoolTaskRunner(max_workers=10))  # local parallelism cap
def process_in_batches(items, batch_size=50):
    results = []
    for batch in batched_iterable(items, batch_size):
        futs = [inner.submit(i) for i in batch]   # submit a batch
        results.extend([f.result() for f in futs])  # wait for the batch
    return resultsrate_limitASHIK
08/11/2025, 4:17 PMMarvin
08/11/2025, 4:17 PMMarvin
08/11/2025, 4:19 PMprefect concurrency-limit create field-tasks 50from prefect import flow, task
from prefect.futures import as_completed
@task(tags=["field-tasks"], cache_result_in_memory=False, persist_result=False)
def process_field(field_id):
    # ... do the work ...
    return None  # or a small summary if you must keep something
@flow
def process_farm(farm):
    # Submit all fields, but only 50 will actually run at once due to the tag limit
    futures = [process_field.submit(fid) for fid in farm.fields]
    # Stream results to release memory as tasks finish
    for fut in as_completed(futures):
        _ = fut.result()  # handle minimal output if needed
        # drop references quickly; don’t accumulate large lists
@flow
def process_account(farms):
    # run farms however you like (parallel or not)
    for farm in farms:
        process_farm.submit(farm)cache_result_in_memory=Falsepersist_result=Falseas_completed(...)from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.utilities.collections import batched_iterable
from prefect.futures import as_completed
@task(cache_result_in_memory=False, persist_result=False)
def process_field(field_id):
    # ... work ...
    return None
@flow(task_runner=ThreadPoolTaskRunner(max_workers=50))
def process_farm_in_batches(farm, batch_size=50):
    for batch in batched_iterable(farm.fields, batch_size):
        futs = [process_field.submit(fid) for fid in batch]
        for f in as_completed(futs):
            _ = f.result()ASHIK
08/11/2025, 4:20 PMMarvin
08/11/2025, 4:20 PMMarvin
08/11/2025, 4:21 PM