ÖmerFİ
09/27/2025, 2:15 PMMarvin
09/27/2025, 2:15 PMMarvin
09/27/2025, 2:20 PMfrom prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import resolve_futures_to_results
@flow(name="child-flow")
def child_flow(i: int) -> int:
# your work here
return i * i
@task
def run_child_once(i: int) -> int:
# calling a flow inside a task runs it as a subflow
return child_flow(i)
@flow(name="parent-flow", task_runner=ThreadPoolTaskRunner(max_workers=5))
def parent_flow(n: int = 10):
futures = [run_child_once.submit(i) for i in range(n)]
results = resolve_futures_to_results(futures)
return results
if __name__ == "__main__":
print(parent_flow(n=10))
2) Trigger X separate flow runs via a deployment (distributed, worker-executed)
- Create a deployment for the child flow
- From the parent flow, call run_deployment
concurrently (wrap it in a task)
- Optionally wait for each run to finish and gather results
from prefect import flow, task
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.task_runners import ThreadPoolTaskRunner
@task
def trigger_child_run(i: int) -> str:
fr = run_deployment(
name="child-flow/prod", # "<flow-name>/<deployment-name>"
parameters={"i": i},
as_subflow=True # True = appears as subflow in the parent
)
return str(fr.id)
@flow(name="parent-flow", task_runner=ThreadPoolTaskRunner(max_workers=10))
def parent_flow(n: int = 10, wait: bool = True):
run_ids = [trigger_child_run.submit(i) for i in range(n)]
run_ids = [f.result() for f in run_ids]
if wait:
for rid in run_ids:
wait_for_flow_run(rid, log_states=True)
return run_ids
if __name__ == "__main__":
parent_flow(n=10, wait=True)
Notes
- Choose ThreadPoolTaskRunner for I/O-bound work; ProcessPoolTaskRunner for CPU-bound work.
- In pattern 2, set as_subflow=False
if you prefer the child runs to be totally independent (not shown under the parent in the UI).
Would you like help tailoring this to your setup? For example:
- Same parameters each time or different?
- Do you want to wait for all runs to finish and aggregate results?
- Are you running locally only, or do you have workers executing deployments?
References
- Task runners overview: prefect.task_runners
- Parallel/mapped patterns: Beyond loops: task mapping
- AnyIO in Prefect’s async architecture: How AnyIO powers Prefect’s async architectureÖmerFİ
09/27/2025, 2:51 PMMarvin
09/27/2025, 2:51 PMMarvin
09/27/2025, 2:53 PMfrom prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import resolve_futures_to_results
@flow
def child_flow(i: int) -> int:
return i * i
@flow(task_runner=ThreadPoolTaskRunner(max_workers=5))
def parent_flow(n: int = 10):
futures = [child_flow.submit(i=i) for i in range(n)]
return resolve_futures_to_results(futures)
if __name__ == "__main__":
print(parent_flow(10))
Notes
- Use a task runner on the parent (e.g., ThreadPoolTaskRunner
) to enable parallelism.
- Collect results via .result()
on each future or resolve_futures_to_results
.
If you’re triggering deployments instead of calling the flow function directly, you can parallelize by submitting multiple calls to run_deployment
(typically from a task) — but for plain subflows, child_flow.submit(...)
is all you need.
Refs: Task runners in PrefectÖmerFİ
09/27/2025, 2:58 PMMarvin
09/27/2025, 2:58 PMMarvin
09/27/2025, 2:59 PM.submit
method. .submit
is for tasks. My earlier example using child_flow.submit(...)
was incorrect — sorry for the confusion.
- Do you need a “middleware” wrapper? If you want to run multiple child flows concurrently within a parent flow, yes: you typically wrap the call to the child flow in a task and submit that task multiple times. Tasks are the unit that the task runner schedules concurrently.
- Do you have to specify a task runner? The default is PrefectTaskRunner
, which runs tasks serially. To get concurrency, set a concurrent runner (e.g., ThreadPoolTaskRunner
or ProcessPoolTaskRunner
) on your parent flow.
Example: concurrent subflows via a wrapper task
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import resolve_futures_to_results
@flow
def child_flow(i: int) -> int:
return i * i
@task
def run_child(i: int) -> int:
# Calling a flow inside a task creates a subflow
return child_flow(i)
@flow(task_runner=ThreadPoolTaskRunner(max_workers=5))
def parent_flow(n: int = 10):
futures = [run_child.submit(i) for i in range(n)]
return resolve_futures_to_results(futures)
If you don’t need subflow features (e.g., separate flow runs in the UI, own retries, deployment, etc.), consider making the “child” a task instead of a flow and submit it directly:
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
@task
def child_task(i: int) -> int:
return i * i
@flow(task_runner=ThreadPoolTaskRunner(max_workers=5))
def parent_flow(n: int = 10):
futures = [child_task.submit(i) for i in range(n)]
return [f.result() for f in futures]
Alternative: concurrent runs via deployments
- If the child is deployed and you want worker-executed isolation/scaling, trigger multiple runs with run_deployment
(often from a task) and optionally wait on them.
from prefect import flow, task
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.task_runners import ThreadPoolTaskRunner
@task
def trigger(i: int) -> str:
fr = run_deployment(name="child-flow/prod", parameters={"i": i})
return str(fr.id)
@flow(task_runner=ThreadPoolTaskRunner(max_workers=10))
def parent_flow(n: int = 10, wait: bool = True):
run_ids = [trigger.submit(i).result() for i in range(n)]
if wait:
for rid in run_ids:
wait_for_flow_run(rid, log_states=True)
return run_ids
Links
- Task runners: prefect.task_runners
- run_deployment API: prefect.deployments.flow_runs.run_deployment
If you share a bit more about your use case (subflows vs. deployments, CPU vs. I/O work, need to aggregate results, etc.), I can recommend the best pattern and runner settings.