<@ULVA73B9P> for the sake of grouping multiple tas...
# ask-marvin
i
@Marvin for the sake of grouping multiple tasks and re-using them, what's the best way to handle that while still allowing for concurrency (like allow the same group to run multiple times concurrently). An example is to have task_1, task_2, task_3 that need to be grouped as group_a (options here are a subflow, simple python function, tasks that calls other tasks?, .. something else). So in the same main flow, I want to be able to spin multiple concurrent runs group_a (it seems not possible with subflows and simple Python functions which calls tasks). One idea I didn't test yet is have the group as a python function that calls submit for everything and have an explicit dependency in each call like wait_for=...
m
thought for 504.4 seconds
Great question. In Prefect 3.x, the cleanest way to create a reusable “group” of tasks that you can run many times concurrently within the same flow is: - Define the group as a plain Python helper function that orchestrates tasks with
.submit
and uses
wait_for
for ordering. - Call that helper from your parent flow as many times as you need; each call returns futures you can manage independently. - Collect results with
as_completed
or by resolving
.result()
when you need them. Why this works well: - Flows orchestrate tasks; tasks should not orchestrate other tasks. Avoid “a task that calls other tasks.” Keep orchestration in the flow (or a plain function called by the flow). - Subflows called directly will block the parent flow call (i.e., they don’t run concurrently with each other within the same parent). If you need concurrent “flow-like” runs, use deployments +
run_deployment(as_subflow=False)
instead. Example pattern (reusable task group with concurrency)
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import as_completed

@task
def task_1(x): ...
@task
def task_2(y): ...
@task
def task_3(a, b): ...

# Reusable group: submit tasks and return futures
def group_a(x):
    f1 = task_1.submit(x)
    f2 = task_2.submit(x)
    f3 = task_3.submit(f1, f2, wait_for=[f1, f2])  # explicit dependency
    return {"t1": f1, "t2": f2, "final": f3}

@flow(task_runner=ThreadPoolTaskRunner(max_workers=16))  # tune as needed
def main(items: list[int]):
    # Launch many groups concurrently
    groups = [group_a(i) for i in items]

    # Option 1: wait on each group's final result
    results = [g["final"].result() for g in groups]

    # Option 2: stream results as tasks finish across all groups
    all_futures = [g["final"] for g in groups]
    for fut in as_completed(all_futures):
        val = fut.result()
        # handle result as it completes
    return results
Notes and best practices - Plain Python wrapper vs subflow: The wrapper function preserves full concurrency of submitted tasks and is easy to reuse. Subflows are fine for conceptual separation, but direct subflow calls block the parent until each finishes. - Task from task: Don’t do this. If you call a Prefect task from inside another task, it will execute like a normal function (not tracked/orchestrated). Keep orchestration in flows or helper functions called by flows. - Dependencies: Use
wait_for=[...]
to enforce ordering between submitted tasks. If no upstream dependency, tasks will start as soon as the task runner has capacity. Concurrency controls you may want - Task runner parallelism: Choose and tune at the flow level.
Copy code
@flow(task_runner=ThreadPoolTaskRunner(max_workers=16))
  def main(...): ...
See Task runners. - Global concurrency limits (server-enforced across runs): Tag tasks and set limits via CLI. For example:
Copy code
@task(tags=["db"])
  def write_to_db(...): ...

  # CLI
  prefect concurrency-limit create db --limit 4
Inspect with:
Copy code
prefect concurrency-limit inspect db
Docs: Global concurrency limits. - Code-based concurrency (grab slots in code for critical sections):
Copy code
from prefect.concurrency.sync import concurrency

  @task
  def do_io():
      with concurrency("io", occupy=1):
          ...
Signature reference for the context manager is available in code; see “Concurrency” section in the guide above. Collecting results - Stream results as they finish with `as_completed`:
Copy code
from prefect.futures import as_completed
  for fut in as_completed(futures):
      print(fut.result())
Guide: Run work concurrently. When you truly want parallel subflows
- If you want each “group” to be its own flow run (isolation, separate infra, or different work pools), create a deployment and launch multiple in parallel from the parent via `run_deployment(as_subflow=False)`:
Copy code
from prefect import flow
  from prefect.deployments import run_deployment
  from prefect.flow_runs import wait_for_flow_run

  @flow
  def parent(items):
      runs = [
          run_deployment(
              name="GroupA/deploy-name",
              parameters={"x": i},
              as_subflow=False  # run as separate flow runs
          )
          for i in items
      ]
      for r in runs:
          wait_for_flow_run(r.id)  # or poll selectively
Docs: Run deployments from flows. References - Task runners: Task runners - Run work concurrently (submit, as_completed): Run work concurrently - Global concurrency limits: Global concurrency limits - Run deployments from flows: Run deployments Does the helper-function approach cover your use case, or do you need process/infra isolation for each group (in which case deployments might be better)?