Idriss Bellil
09/12/2025, 12:05 PMMarvin
09/12/2025, 12:05 PMMarvin
09/12/2025, 12:13 PM.submit
and uses wait_for
for ordering.
- Call that helper from your parent flow as many times as you need; each call returns futures you can manage independently.
- Collect results with as_completed
or by resolving .result()
when you need them.
Why this works well:
- Flows orchestrate tasks; tasks should not orchestrate other tasks. Avoid “a task that calls other tasks.” Keep orchestration in the flow (or a plain function called by the flow).
- Subflows called directly will block the parent flow call (i.e., they don’t run concurrently with each other within the same parent). If you need concurrent “flow-like” runs, use deployments + run_deployment(as_subflow=False)
instead.
Example pattern (reusable task group with concurrency)
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import as_completed
@task
def task_1(x): ...
@task
def task_2(y): ...
@task
def task_3(a, b): ...
# Reusable group: submit tasks and return futures
def group_a(x):
f1 = task_1.submit(x)
f2 = task_2.submit(x)
f3 = task_3.submit(f1, f2, wait_for=[f1, f2]) # explicit dependency
return {"t1": f1, "t2": f2, "final": f3}
@flow(task_runner=ThreadPoolTaskRunner(max_workers=16)) # tune as needed
def main(items: list[int]):
# Launch many groups concurrently
groups = [group_a(i) for i in items]
# Option 1: wait on each group's final result
results = [g["final"].result() for g in groups]
# Option 2: stream results as tasks finish across all groups
all_futures = [g["final"] for g in groups]
for fut in as_completed(all_futures):
val = fut.result()
# handle result as it completes
return results
Notes and best practices
- Plain Python wrapper vs subflow: The wrapper function preserves full concurrency of submitted tasks and is easy to reuse. Subflows are fine for conceptual separation, but direct subflow calls block the parent until each finishes.
- Task from task: Don’t do this. If you call a Prefect task from inside another task, it will execute like a normal function (not tracked/orchestrated). Keep orchestration in flows or helper functions called by flows.
- Dependencies: Use wait_for=[...]
to enforce ordering between submitted tasks. If no upstream dependency, tasks will start as soon as the task runner has capacity.
Concurrency controls you may want
- Task runner parallelism: Choose and tune at the flow level.
@flow(task_runner=ThreadPoolTaskRunner(max_workers=16))
def main(...): ...
See Task runners.
- Global concurrency limits (server-enforced across runs): Tag tasks and set limits via CLI. For example:
@task(tags=["db"])
def write_to_db(...): ...
# CLI
prefect concurrency-limit create db --limit 4
Inspect with:
prefect concurrency-limit inspect db
Docs: Global concurrency limits.
- Code-based concurrency (grab slots in code for critical sections):
from prefect.concurrency.sync import concurrency
@task
def do_io():
with concurrency("io", occupy=1):
...
Signature reference for the context manager is available in code; see “Concurrency” section in the guide above.
Collecting results
- Stream results as they finish with `as_completed`:
from prefect.futures import as_completed
for fut in as_completed(futures):
print(fut.result())
Guide: Run work concurrently.
When you truly want parallel subflowsMarvin
09/12/2025, 12:13 PMfrom prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
@flow
def parent(items):
runs = [
run_deployment(
name="GroupA/deploy-name",
parameters={"x": i},
as_subflow=False # run as separate flow runs
)
for i in items
]
for r in runs:
wait_for_flow_run(r.id) # or poll selectively
Docs: Run deployments from flows.
References
- Task runners: Task runners
- Run work concurrently (submit, as_completed): Run work concurrently
- Global concurrency limits: Global concurrency limits
- Run deployments from flows: Run deployments
Does the helper-function approach cover your use case, or do you need process/infra isolation for each group (in which case deployments might be better)?