Giovanni Furetti
10/21/2025, 4:08 PMMarvin
10/21/2025, 4:08 PMMarvin
10/21/2025, 4:08 PMfrom typing import Iterable, Tuple
from prefect import flow, task, get_run_logger
@task
def do_work(a: int, b: int) -> int:
    # Put the compute/IO in tasks (best practice)
    return a + b
@flow(name="compute-sum-subflow")
def compute_sum(a: int, b: int) -> int:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Starting subflow with a={a}, b={b}")
    result = do_work.submit(a, b).result()  # submit for orchestration; get the result
    <http://logger.info|logger.info>(f"Subflow result: {result}")
    return result
@flow(name="parent-flow")
def parent_flow(param_sets: Iterable[Tuple[int, int]] = ((1, 2), (10, 20), (5, 7))) -> list[int]:
    logger = get_run_logger()
    <http://logger.info|logger.info>("Invoking subflow multiple times with different parameters")
    results: list[int] = []
    for a, b in param_sets:
        # Each call creates a child flow run visible in the UI
        r = compute_sum(a=a, b=b)
        results.append(r)
    <http://logger.info|logger.info>(f"All subflow results: {results}")
    return results
if __name__ == "__main__":
    parent_flow()
Notes
- Subflows are just flows called from another flow; each call creates a child flow run.
- Keep heavy logic or side effects in tasks, and use type hints for clarity.
- Use .submit(...).result() inside flows to orchestrate task execution and get the return value.
Docs
- Subflows
- TasksGiovanni Furetti
10/21/2025, 4:09 PMMarvin
10/21/2025, 4:10 PMMarvin
10/21/2025, 4:14 PMfrom typing import Iterable, Tuple, List
from prefect import flow, task, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
# For CPU-bound work, consider: from prefect.task_runners import ProcessPoolTaskRunner
@task
def do_work(a: int, b: int) -> int:
    # Put heavy/IO work in tasks
    return a + b
@flow(name="compute-sum-subflow")
def compute_sum(a: int, b: int) -> int:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Starting subflow with a={a}, b={b}")
    result = do_work.submit(a, b).result()
    <http://logger.info|logger.info>(f"Subflow result: {result}")
    return result
@task
def invoke_subflow(a: int, b: int) -> int:
    # Wrap the subflow so we can submit many in parallel
    return compute_sum(a=a, b=b)
@flow(
    name="parent-flow",
    task_runner=ThreadPoolTaskRunner(max_workers=8),  # adjust for your workload
    # For CPU-bound work, you can use: task_runner=ProcessPoolTaskRunner(max_workers=8)
)
def parent_flow(
    param_sets: Iterable[Tuple[int, int]] = ((1, 2), (10, 20), (5, 7))
) -> List[int]:
    logger = get_run_logger()
    <http://logger.info|logger.info>("Submitting subflows concurrently")
    futures = [invoke_subflow.submit(a, b) for a, b in param_sets]
    results = [f.result() for f in futures]
    <http://logger.info|logger.info>(f"All subflow results: {results}")
    return results
if __name__ == "__main__":
    parent_flow()
Why this pattern
- Subflows are synchronous when called directly; wrapping in a task lets you submit many subflow invocations concurrently under the parent’s task runner.
- Keep heavy logic in tasks; use ThreadPoolTaskRunner for IO-bound or ProcessPoolTaskRunner for CPU-bound concurrency.
Related docs
- Subflows
- Task runners and concurrency
Optional: If you need each subflow to run on its own infrastructure/worker, create a deployment for the subflow and trigger them concurrently via tasks that call run_deployment, but the above is the simplest in-process concurrent pattern.Giovanni Furetti
10/22/2025, 6:30 AMMarvin
10/22/2025, 6:30 AMMarvin
10/22/2025, 6:33 AMfrom typing import Iterable, Tuple, List
from prefect import flow, task, get_run_logger
from prefect.flows import bind_flow_to_infrastructure
from prefect.workers import ProcessWorker
from prefect.futures import wait
@task
def do_work(a: int, b: int) -> int:
    return a + b
@flow(name="compute-sum-subflow")
def compute_sum(a: int, b: int) -> int:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Starting subflow with a={a}, b={b}")
    return do_work.submit(a, b).result()
@flow(name="parent-flow")
def parent_flow(
    param_sets: Iterable[Tuple[int, int]] = ((1, 2), (10, 20), (5, 7))
) -> List[int]:
    # Bind the subflow to a work pool so we can submit it concurrently
    # Ensure a Process work pool named "process-pool" exists and a worker is running
    ibf = bind_flow_to_infrastructure(
        compute_sum,
        work_pool="process-pool",
        worker_cls=ProcessWorker,
    )
    futures = [ibf.submit(a=a, b=b) for a, b in param_sets]
    wait(futures)  # optional: block until completed
    return [f.result() for f in futures]
if __name__ == "__main__":
    parent_flow()
2) Submit flows to the Runner (no deployment required)
- Fire-and-forget style submission that returns FlowRun objects; you can wait for all submitted runs.
- Best when you don’t need immediate Python return values (if you do, prefer the infrastructure-bound flow pattern above).
from typing import Iterable, Tuple
from prefect import flow
from prefect.runner import submit_to_runner, wait_for_submitted_runs
from prefect import get_run_logger
@flow
def compute_sum(a: int, b: int) -> int:
    return a + b
@flow
def parent_flow(param_sets: Iterable[Tuple[int, int]] = ((1, 2), (10, 20), (5, 7))):
    logger = get_run_logger()
    params = [{"a": a, "b": b} for a, b in param_sets]
    # Submit many subflow runs concurrently
    runs = submit_to_runner(compute_sum, parameters=params)
    <http://logger.info|logger.info>(f"Submitted {len(runs)} child runs")
    # Wait for all submitted runs tied to this parent to finish
    wait_for_submitted_runs()
    <http://logger.info|logger.info>("All child runs completed")
    return [r.id for r in runs]
When to choose which
- Want futures and easy result values with no deployments? Use the infrastructure-bound flow .submit() pattern.
- Want to fan out runs without futures/return values (or you plan to read results from storage/state)? Use submit_to_runner.
- Need each subflow on its own deployment/infrastructure? Create a deployment for the subflow and trigger many with run_deployment in parallel (typically via tasks or threads).
Docs
- Run work concurrently
- Submit flows directly to dynamic infrastructure
- Runner submission API