<@ULVA73B9P>, i have nested tasks i want to set co...
# ask-marvin
a
@Marvin, i have nested tasks i want to set concurrency or batcjh processing of a taskw ithin nested task
m
thought for 195.6 seconds
Got it — you’ve got a task that kicks off “inner” work and you want to cap how much runs at once or process it in batches. In Prefect 3.x you’ve got a few solid options, depending on whether you want the limit to apply within just one flow run or across your whole workspace. Important note up front: don’t orchestrate tasks from inside tasks. Put the fan‑out/fan‑in logic in a subflow. Tasks can do work; flows orchestrate. Option A — Orchestration-level task concurrency (tags) - Scope: applies across all flows/runs in your workspace for tasks with a given tag. - Set the limit once with the CLI, then tag the “inner” task. CLI (verified):
Copy code
prefect concurrency-limit create inner-task 5
Code:
Copy code
from prefect import flow, task, tags

@task(tags=["inner-task"])
def inner(x):
    # ... do the thing ...
    return x

@flow
def parent(items):
    # parent can submit as many as it wants; the tag enforces 5 at a time across the workspace
    futures = [inner.submit(i) for i in items]
    return [f.result() for f in futures]
Option B — Global Concurrency Limits (GCL) from code - Scope: fine-grained named “resources” you acquire in code; useful inside a task or subflow to gate critical sections. - Create the limit, then use
prefect.concurrency.*
to occupy slots. CLI (verified):
Copy code
prefect global-concurrency-limit create io-db --limit 5
Async task:
Copy code
from prefect import task
from prefect.concurrency.asyncio import concurrency

@task
async def inner(x):
    async with concurrency(names="io-db", occupy=1, timeout_seconds=60):
        # only 5 inners across the workspace can be in here at once
        # ... do async work ...
        return x
Sync task:
Copy code
from prefect import task
from prefect.concurrency.sync import concurrency

@task
def inner(x):
    with concurrency(names="io-db", occupy=1, timeout_seconds=60):
        # ... do sync work ...
        return x
Option C — Batch processing inside a subflow (local cap) - Scope: limit within a single flow run by submitting in chunks; combine with a task runner to cap local parallelism.
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.utilities.collections import batched_iterable

@task
def inner(x):
    # ... work ...
    return x

@flow(task_runner=ThreadPoolTaskRunner(max_workers=10))  # local parallelism cap
def process_in_batches(items, batch_size=50):
    results = []
    for batch in batched_iterable(items, batch_size):
        futs = [inner.submit(i) for i in batch]   # submit a batch
        results.extend([f.result() for f in futs])  # wait for the batch
    return results
Optional — Rate limiting If you need “per second” pacing instead of simultaneous slot caps, create a GCL with slot decay and call
rate_limit
inside your task before each unit of work. Docs and references - Global concurrency limits: Docs: Global Concurrency Limits - Background on scaling/mapping: Blog: Mapping at scale A couple of quick clarifiers so I can point you to the simplest setup: - Do you want the limit to apply across all flows in your workspace, or just within a single run? - Are your “inner” tasks async or sync? - When you say “nested tasks,” is the fan-out currently happening inside a task, or in a subflow? If it’s inside a task, I’ll show you how to move that orchestration into a subflow cleanly. I know, concurrency always wants to be infinite until your database cries. We can keep it civil.
a
@Marvin i have accont level flow whcih executes famr level thefield level each farm can contain 1 to 100 fields as theyr are in parallell i dont have contriol, i wnat to limt field level exuction to 50 and relaese memory
m
thought for 87.2 seconds
Understood. You’ve got an account flow -> farm subflow -> field task pattern, and you want to cap field-level concurrency at 50 while not hoarding memory. Two practical ways to do this in Prefect 3.x: 1) Workspace-wide cap (recommended): tag-based task concurrency limit - This limits all field tasks across all farms and runs to at most 50 Running at a time, no matter how many farms you fan out. CLI (verified):
Copy code
prefect concurrency-limit create field-tasks 50
Code:
Copy code
from prefect import flow, task
from prefect.futures import as_completed

@task(tags=["field-tasks"], cache_result_in_memory=False, persist_result=False)
def process_field(field_id):
    # ... do the work ...
    return None  # or a small summary if you must keep something

@flow
def process_farm(farm):
    # Submit all fields, but only 50 will actually run at once due to the tag limit
    futures = [process_field.submit(fid) for fid in farm.fields]

    # Stream results to release memory as tasks finish
    for fut in as_completed(futures):
        _ = fut.result()  # handle minimal output if needed
        # drop references quickly; don’t accumulate large lists

@flow
def process_account(farms):
    # run farms however you like (parallel or not)
    for farm in farms:
        process_farm.submit(farm)
Notes for memory: -
cache_result_in_memory=False
prevents Prefect from retaining large results in the worker’s memory. -
persist_result=False
avoids writing results to storage if you don’t need them later. - Stream with
as_completed(...)
so you don’t hold a giant list of in-flight results. 2) Per-run local cap + batching (keeps memory tight within each farm subflow) - If you only want at most 50 in-flight per farm subflow (not across the whole workspace), submit in batches and/or cap the task runner.
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.utilities.collections import batched_iterable
from prefect.futures import as_completed

@task(cache_result_in_memory=False, persist_result=False)
def process_field(field_id):
    # ... work ...
    return None

@flow(task_runner=ThreadPoolTaskRunner(max_workers=50))
def process_farm_in_batches(farm, batch_size=50):
    for batch in batched_iterable(farm.fields, batch_size):
        futs = [process_field.submit(fid) for fid in batch]
        for f in as_completed(futs):
            _ = f.result()
Which to choose? - Want a hard “no more than 50 field tasks running anywhere” rule? Use option 1 (tag concurrency limit). - Want each farm subflow to run up to 50-at-a-time independently? Use option 2 (batching + task runner). - If you want both global control and tight local memory, combine them: tag limit at 50 and batch per farm. The tag ensures a hard global cap; batching keeps each subflow’s in-flight futures reasonable. Docs for reference: - Global concurrency concepts (also useful background): Global Concurrency Limits Quick clarifier so I can tailor this exactly: - Do you need the “50” limit across all farms combined, or 50 per farm subflow? - Are field tasks producing large results you don’t need to keep? If so, we can aggressively disable result persistence and memory caching as shown.
a
@Marvin cache_result_in_memory=False, persist_result=False) what this do
m
thought for 68.2 seconds
Short version: - cache_result_in_memory=False: don’t keep the task’s return value in the worker’s RAM after it finishes. - persist_result=False: don’t write the task’s return value to result storage (disk/cloud). In other words, no durable artifact is saved. What that means in practice: - With cache_result_in_memory=False: - Downstream tasks can still depend on the future; Prefect will resolve it, but it won’t retain a big Python object in memory once the task completes. - If persist_result is also False, Prefect won’t be able to re-load the value later from storage; resolving the future just confirms success and returns the in-memory value at that moment, then it’s eligible for GC when references drop. - With persist_result=False: - Nothing is saved to a result store, so you cannot re-fetch the result after the process ends or from another run. - Good for memory/disk savings when you don’t need the output later. Defaults in Prefect 3.x: - cache_result_in_memory defaults to True. - persist_result defaults to False unless you’ve configured defaults to persist. When to use them: - For high fan-out workloads where you don’t need large per-item outputs, set both to False to minimize memory and storage usage. - If you need to reuse results across runs or after the worker process ends, set persist_result=True and configure result_storage; otherwise keep it False. If your field tasks return large data, also consider returning a small summary or a path/ID to an external artifact you control, rather than the full payload. Keeps the RAM gremlins from throwing a party.