<@ULVA73B9P> write me an example in Prefect 3.0 of...
# marvin-ai
g
@Marvin write me an example in Prefect 3.0 of a flow that invokes the same subflow multiple times with different parameters. The subflow takes two input parameters. Do it using Prefect’s recommended best practices.
m
thought for 28.5 seconds
Here’s a concise, best-practices Prefect 3.x example where a parent flow calls the same subflow multiple times with different parameters. The subflow takes two inputs and does its work in a task, with type hints and logging.
Copy code
from typing import Iterable, Tuple

from prefect import flow, task, get_run_logger


@task
def do_work(a: int, b: int) -> int:
    # Put the compute/IO in tasks (best practice)
    return a + b


@flow(name="compute-sum-subflow")
def compute_sum(a: int, b: int) -> int:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Starting subflow with a={a}, b={b}")
    result = do_work.submit(a, b).result()  # submit for orchestration; get the result
    <http://logger.info|logger.info>(f"Subflow result: {result}")
    return result


@flow(name="parent-flow")
def parent_flow(param_sets: Iterable[Tuple[int, int]] = ((1, 2), (10, 20), (5, 7))) -> list[int]:
    logger = get_run_logger()
    <http://logger.info|logger.info>("Invoking subflow multiple times with different parameters")

    results: list[int] = []
    for a, b in param_sets:
        # Each call creates a child flow run visible in the UI
        r = compute_sum(a=a, b=b)
        results.append(r)

    <http://logger.info|logger.info>(f"All subflow results: {results}")
    return results


if __name__ == "__main__":
    parent_flow()
Notes - Subflows are just flows called from another flow; each call creates a child flow run. - Keep heavy logic or side effects in tasks, and use type hints for clarity. - Use
.submit(...).result()
inside flows to orchestrate task execution and get the return value. Docs - Subflows - Tasks
g
@Marvin the subflows must run concurrently
m
thought for 269.4 seconds
Great — here’s a best-practices Prefect 3.x pattern to run the same subflow concurrently with different parameters. We wrap the subflow call in a task and submit many of those tasks; the parent flow uses a task runner to enable concurrency.
Copy code
from typing import Iterable, Tuple, List

from prefect import flow, task, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
# For CPU-bound work, consider: from prefect.task_runners import ProcessPoolTaskRunner


@task
def do_work(a: int, b: int) -> int:
    # Put heavy/IO work in tasks
    return a + b


@flow(name="compute-sum-subflow")
def compute_sum(a: int, b: int) -> int:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Starting subflow with a={a}, b={b}")
    result = do_work.submit(a, b).result()
    <http://logger.info|logger.info>(f"Subflow result: {result}")
    return result


@task
def invoke_subflow(a: int, b: int) -> int:
    # Wrap the subflow so we can submit many in parallel
    return compute_sum(a=a, b=b)


@flow(
    name="parent-flow",
    task_runner=ThreadPoolTaskRunner(max_workers=8),  # adjust for your workload
    # For CPU-bound work, you can use: task_runner=ProcessPoolTaskRunner(max_workers=8)
)
def parent_flow(
    param_sets: Iterable[Tuple[int, int]] = ((1, 2), (10, 20), (5, 7))
) -> List[int]:
    logger = get_run_logger()
    <http://logger.info|logger.info>("Submitting subflows concurrently")

    futures = [invoke_subflow.submit(a, b) for a, b in param_sets]
    results = [f.result() for f in futures]

    <http://logger.info|logger.info>(f"All subflow results: {results}")
    return results


if __name__ == "__main__":
    parent_flow()
Why this pattern - Subflows are synchronous when called directly; wrapping in a task lets you submit many subflow invocations concurrently under the parent’s task runner. - Keep heavy logic in tasks; use ThreadPoolTaskRunner for IO-bound or ProcessPoolTaskRunner for CPU-bound concurrency. Related docs - Subflows - Task runners and concurrency Optional: If you need each subflow to run on its own infrastructure/worker, create a deployment for the subflow and trigger them concurrently via tasks that call
run_deployment
, but the above is the simplest in-process concurrent pattern.
g
@Marvin But is calling a subflow in a task really the only way? Aren’t there any cleaner methods?
m
thought for 213.3 seconds
Great question — no, wrapping a subflow call in a task is not the only way. Two cleaner, fully supported patterns in Prefect 3.x: 1) Bind the subflow to infrastructure and call .submit() directly - This turns your flow into an InfrastructureBoundFlow so you can submit it like a task and get a PrefectFlowRunFuture back (clean futures + easy result retrieval). - Requires a work pool (and a running worker) for the chosen worker type.
Copy code
from typing import Iterable, Tuple, List

from prefect import flow, task, get_run_logger
from prefect.flows import bind_flow_to_infrastructure
from prefect.workers import ProcessWorker
from prefect.futures import wait


@task
def do_work(a: int, b: int) -> int:
    return a + b


@flow(name="compute-sum-subflow")
def compute_sum(a: int, b: int) -> int:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Starting subflow with a={a}, b={b}")
    return do_work.submit(a, b).result()


@flow(name="parent-flow")
def parent_flow(
    param_sets: Iterable[Tuple[int, int]] = ((1, 2), (10, 20), (5, 7))
) -> List[int]:
    # Bind the subflow to a work pool so we can submit it concurrently
    # Ensure a Process work pool named "process-pool" exists and a worker is running
    ibf = bind_flow_to_infrastructure(
        compute_sum,
        work_pool="process-pool",
        worker_cls=ProcessWorker,
    )

    futures = [ibf.submit(a=a, b=b) for a, b in param_sets]
    wait(futures)  # optional: block until completed
    return [f.result() for f in futures]


if __name__ == "__main__":
    parent_flow()
2) Submit flows to the Runner (no deployment required) - Fire-and-forget style submission that returns FlowRun objects; you can wait for all submitted runs. - Best when you don’t need immediate Python return values (if you do, prefer the infrastructure-bound flow pattern above).
Copy code
from typing import Iterable, Tuple

from prefect import flow
from prefect.runner import submit_to_runner, wait_for_submitted_runs
from prefect import get_run_logger


@flow
def compute_sum(a: int, b: int) -> int:
    return a + b


@flow
def parent_flow(param_sets: Iterable[Tuple[int, int]] = ((1, 2), (10, 20), (5, 7))):
    logger = get_run_logger()
    params = [{"a": a, "b": b} for a, b in param_sets]

    # Submit many subflow runs concurrently
    runs = submit_to_runner(compute_sum, parameters=params)
    <http://logger.info|logger.info>(f"Submitted {len(runs)} child runs")

    # Wait for all submitted runs tied to this parent to finish
    wait_for_submitted_runs()
    <http://logger.info|logger.info>("All child runs completed")
    return [r.id for r in runs]
When to choose which - Want futures and easy result values with no deployments? Use the infrastructure-bound flow
.submit()
pattern. - Want to fan out runs without futures/return values (or you plan to read results from storage/state)? Use
submit_to_runner
. - Need each subflow on its own deployment/infrastructure? Create a deployment for the subflow and trigger many with
run_deployment
in parallel (typically via tasks or threads). Docs - Run work concurrently - Submit flows directly to dynamic infrastructure - Runner submission API