<@ULVA73B9P> can i run a flow X times in another f...
# ask-marvin
ö
@Marvin can i run a flow X times in another flow conquerently
m
thought for 329.4 seconds
Yes—totally doable. Interpreting “conquerently” as “concurrently,” here are two common patterns in Prefect 3.x: 1) Run a child flow X times as subflows in the same process - Use a task runner (e.g., ThreadPoolTaskRunner) in the parent flow - Submit tasks that each call the child flow - Great for lightweight, local parallelism; parent waits and can collect results
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import resolve_futures_to_results

@flow(name="child-flow")
def child_flow(i: int) -> int:
    # your work here
    return i * i

@task
def run_child_once(i: int) -> int:
    # calling a flow inside a task runs it as a subflow
    return child_flow(i)

@flow(name="parent-flow", task_runner=ThreadPoolTaskRunner(max_workers=5))
def parent_flow(n: int = 10):
    futures = [run_child_once.submit(i) for i in range(n)]
    results = resolve_futures_to_results(futures)
    return results

if __name__ == "__main__":
    print(parent_flow(n=10))
2) Trigger X separate flow runs via a deployment (distributed, worker-executed) - Create a deployment for the child flow - From the parent flow, call
run_deployment
concurrently (wrap it in a task) - Optionally wait for each run to finish and gather results
Copy code
from prefect import flow, task
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.task_runners import ThreadPoolTaskRunner

@task
def trigger_child_run(i: int) -> str:
    fr = run_deployment(
        name="child-flow/prod",          # "<flow-name>/<deployment-name>"
        parameters={"i": i},
        as_subflow=True                  # True = appears as subflow in the parent
    )
    return str(fr.id)

@flow(name="parent-flow", task_runner=ThreadPoolTaskRunner(max_workers=10))
def parent_flow(n: int = 10, wait: bool = True):
    run_ids = [trigger_child_run.submit(i) for i in range(n)]
    run_ids = [f.result() for f in run_ids]

    if wait:
        for rid in run_ids:
            wait_for_flow_run(rid, log_states=True)

    return run_ids

if __name__ == "__main__":
    parent_flow(n=10, wait=True)
Notes - Choose ThreadPoolTaskRunner for I/O-bound work; ProcessPoolTaskRunner for CPU-bound work. - In pattern 2, set
as_subflow=False
if you prefer the child runs to be totally independent (not shown under the parent in the UI). Would you like help tailoring this to your setup? For example: - Same parameters each time or different? - Do you want to wait for all runs to finish and aggregate results? - Are you running locally only, or do you have workers executing deployments? References - Task runners overview: prefect.task_runners - Parallel/mapped patterns: Beyond loops: task mapping - AnyIO in Prefect’s async architecture: How AnyIO powers Prefect’s async architecture
ö
@Marvin do i have to create a middleware function like run_child_once?
m
thought for 134.2 seconds
Short answer: No — you don’t need a wrapper task. In Prefect 3.x you can submit child flows directly from the parent for concurrent execution:
Copy code
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import resolve_futures_to_results

@flow
def child_flow(i: int) -> int:
    return i * i

@flow(task_runner=ThreadPoolTaskRunner(max_workers=5))
def parent_flow(n: int = 10):
    futures = [child_flow.submit(i=i) for i in range(n)]
    return resolve_futures_to_results(futures)

if __name__ == "__main__":
    print(parent_flow(10))
Notes - Use a task runner on the parent (e.g.,
ThreadPoolTaskRunner
) to enable parallelism. - Collect results via
.result()
on each future or
resolve_futures_to_results
. If you’re triggering deployments instead of calling the flow function directly, you can parallelize by submitting multiple calls to
run_deployment
(typically from a task) — but for plain subflows,
child_flow.submit(...)
is all you need. Refs: Task runners in Prefect
ö
@Marvin flow has no function submit as far as i know? and do i have to specify task_runner? does it have a default?
m
thought for 64.2 seconds
Great questions — and thanks for catching that. - You’re right: flows do not have a
.submit
method.
.submit
is for tasks. My earlier example using
child_flow.submit(...)
was incorrect — sorry for the confusion. - Do you need a “middleware” wrapper? If you want to run multiple child flows concurrently within a parent flow, yes: you typically wrap the call to the child flow in a task and submit that task multiple times. Tasks are the unit that the task runner schedules concurrently. - Do you have to specify a task runner? The default is
PrefectTaskRunner
, which runs tasks serially. To get concurrency, set a concurrent runner (e.g.,
ThreadPoolTaskRunner
or
ProcessPoolTaskRunner
) on your parent flow. Example: concurrent subflows via a wrapper task
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import resolve_futures_to_results

@flow
def child_flow(i: int) -> int:
    return i * i

@task
def run_child(i: int) -> int:
    # Calling a flow inside a task creates a subflow
    return child_flow(i)

@flow(task_runner=ThreadPoolTaskRunner(max_workers=5))
def parent_flow(n: int = 10):
    futures = [run_child.submit(i) for i in range(n)]
    return resolve_futures_to_results(futures)
If you don’t need subflow features (e.g., separate flow runs in the UI, own retries, deployment, etc.), consider making the “child” a task instead of a flow and submit it directly:
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner

@task
def child_task(i: int) -> int:
    return i * i

@flow(task_runner=ThreadPoolTaskRunner(max_workers=5))
def parent_flow(n: int = 10):
    futures = [child_task.submit(i) for i in range(n)]
    return [f.result() for f in futures]
Alternative: concurrent runs via deployments - If the child is deployed and you want worker-executed isolation/scaling, trigger multiple runs with
run_deployment
(often from a task) and optionally wait on them.
Copy code
from prefect import flow, task
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.task_runners import ThreadPoolTaskRunner

@task
def trigger(i: int) -> str:
    fr = run_deployment(name="child-flow/prod", parameters={"i": i})
    return str(fr.id)

@flow(task_runner=ThreadPoolTaskRunner(max_workers=10))
def parent_flow(n: int = 10, wait: bool = True):
    run_ids = [trigger.submit(i).result() for i in range(n)]
    if wait:
        for rid in run_ids:
            wait_for_flow_run(rid, log_states=True)
    return run_ids
Links - Task runners: prefect.task_runners - run_deployment API: prefect.deployments.flow_runs.run_deployment If you share a bit more about your use case (subflows vs. deployments, CPU vs. I/O work, need to aggregate results, etc.), I can recommend the best pattern and runner settings.