Ben Griffiths
09/09/2025, 10:26 AMMarvin
09/09/2025, 10:26 AMMarvin
09/09/2025, 10:35 AMsubmit()
like tasks. In Prefect 3.x, there are three common, “best practice” patterns for running subflows concurrently from a parent flow:
1) In-process concurrency: wrap each subflow call in a task and submit those tasks
- This is the simplest approach when your child flows can run in the same process as the parent. Use a task runner to get real concurrency.
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner # or ProcessPoolTaskRunner
@flow
def child_flow(x: int) -> int:
# ... do work ...
return x * 2
@task
def run_child(x: int) -> int:
# Calling a flow inside a task creates a recorded subflow run
return child_flow(x)
@flow(task_runner=ThreadPoolTaskRunner())
def parent_flow():
futures = [run_child.submit(i) for i in range(5)]
results = [f.result() for f in futures]
return results
- Use ThreadPoolTaskRunner
for I/O-bound work; ProcessPoolTaskRunner
for CPU-bound work.
- You can stream results as they finish with prefect.futures.as_completed
.
2) Async parent flow with concurrent execution via Prefect’s gather
- If you prefer async, Prefect exposes a helper that runs callables concurrently using structured concurrency.
from prefect import flow
from prefect.utilities.asyncutils import gather
@flow
def child_flow(x: int) -> int:
return x * 2
@flow
async def parent_flow():
r1, r2, r3 = await gather(
lambda: child_flow(1),
lambda: child_flow(2),
lambda: child_flow(3),
)
return [r1, r2, r3]
- This keeps everything inside the parent process but runs the child calls concurrently.
- Under the hood, Prefect uses AnyIO for structured concurrency.
3) Distributed concurrency: trigger child deployments concurrently
- If you want each child to run on its own worker/infrastructure (best for scale or strong isolation), deploy the child flow and trigger the deployments concurrently (e.g., via tasks).
from prefect import flow, task
from prefect.deployments import run_deployment
@task
def trigger_child(i: int):
# as_subflow=True records it as a subflow of the parent
fr = run_deployment(
name="Child Flow/child-deployment",
parameters={"x": i},
as_subflow=True,
)
return fr # FlowRun object
@flow
def parent_flow():
futs = [trigger_child.submit(i) for i in range(10)]
return [f.result() for f in futs]
- run_deployment
can wait for completion and supports timeout
and poll_interval
to control that behavior.
- This pattern lets your children fan out across your work pool and scale independently of the parent.
Notes
- Task runners still exist in Prefect 3.x and are the mechanism for in-process concurrency.
- Calling a @flow
from within another flow (even inside a task) is recorded as a subflow run.
- Choose ThreadPool vs ProcessPool based on I/O-bound vs CPU-bound work; choose deployments when you want distributed execution.
Docs and references
- Run work concurrently (Prefect 3.x)
- How AnyIO powers Prefect’s async architecture
Quick check: Do you want the subflows to run concurrently in the same process as the parent (simpler, great for I/O work), or should they be distributed to separate workers (better isolation/scale)? I can tailor an example to your setup.Ben Griffiths
09/09/2025, 10:58 AMMarvin
09/09/2025, 10:58 AMMarvin
09/09/2025, 11:00 AMfrom prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner # or ProcessPoolTaskRunner
@flow
def child_flow(x: int) -> int:
return x * 2
@task
def run_child(x: int) -> int:
return child_flow(x) # recorded as a subflow
@flow(task_runner=ThreadPoolTaskRunner(max_workers=10))
def parent_flow():
futs = [run_child.submit(i) for i in range(100)]
return [f.result() for f in futs]
- ThreadPoolTaskRunner is great for I/O-bound work; use ProcessPoolTaskRunner for CPU-bound work.
Option B: Use Prefect’s orchestration-level concurrency limits (by tag)
- Tag the wrapper task and create a concurrency limit for that tag. Prefect will only allow 10 of those tasks to enter Running at once, even if more are submitted.
from prefect import flow, task
@flow
def child_flow(x: int) -> int:
return x * 2
@task(tags=["child-subflow"])
def run_child(x: int) -> int:
return child_flow(x)
@flow
def parent_flow():
futs = [run_child.submit(i) for i in range(100)]
return [f.result() for f in futs]
Then create the limit:
prefect concurrency-limit create child-subflow 10
You can also acquire the limit explicitly in code:
from prefect.concurrency.sync import concurrency
@task
def run_child(x: int) -> int:
with concurrency("child-subflow"): # requires a concurrency limit with this name
return child_flow(x)
Option C: If you want distributed subflows, trigger deployments and use a pool with concurrency=10
- Put the child flow in its own deployment that targets a work pool with concurrency set to 10. Then trigger those deployments from the parent (each child runs on a worker, and the pool enforces the 10‑at‑a‑time limit).
from prefect import flow, task
from prefect.deployments import run_deployment
@task
def trigger_child(i: int):
return run_deployment(
name="Child Flow/child-deployment",
parameters={"x": i},
as_subflow=True, # records parent/child relationship
)
@flow
def parent_flow():
futs = [trigger_child.submit(i) for i in range(100)]
return [f.result() for f in futs]
Pick A or B if you’re keeping everything in-process; pick C if you want each subflow to fan out to workers and be throttled by a pool.
References
- Run work concurrently (Prefect 3.x)
- prefect concurrency-limit
CLI (manage tag-based limits): run prefect concurrency-limit --help
for details
If you can share whether your child work is I/O or CPU bound and whether you want in‑process vs distributed execution, I can recommend the best option and tailor the snippet further.Ben Griffiths
09/09/2025, 11:05 AMMarvin
09/09/2025, 11:05 AMMarvin
09/09/2025, 11:06 AMrun_deployment()
from your parent flow to launch child flow runs concurrently.
How to do it
- Wrap run_deployment
in a task and submit many of them. Each call creates a separate child flow run on your workers and they’ll run in parallel, subject to your pool’s limits.
from prefect import flow, task
from prefect.deployments import run_deployment
@task
def trigger_child(i: int):
# as_subflow=True records the parent/child relation in the UI
# timeout/poll_interval control waiting behavior while the task runs
return run_deployment(
name="Child Flow/child-deployment",
parameters={"x": i},
as_subflow=True,
timeout=3600,
poll_interval=5,
)
@flow
def parent_flow():
futs = [trigger_child.submit(i) for i in range(100)]
runs = [f.result() for f in futs] # wait for each child run
return runs
How to cap concurrency at 10
- Set the concurrency on the work pool targeted by the child deployment(s) to 10. The pool will enforce that only 10 child runs are started at a time; the rest will queue.
- Alternatively (or additionally), use a tag-based concurrency limit if you want cross-pool throttling:
- Tag the child deployment or the trigger_child
task with a tag, e.g. child-subflow
- Create a limit: prefect concurrency-limit create child-subflow 10
- You can also acquire it explicitly in code with with concurrency("child-subflow"):
if desired.
Notes
- Using run_deployment
launches truly separate flow runs on workers, so pool-level concurrency applies. This is different from calling a subflow function directly inside the parent process, where pool limits do not apply.
- If you need to collect return values from child flows, consider in-process subflows (flow calls inside tasks) or persist results and read them back; run_deployment
runs out-of-process and does not directly return the child flow’s Python return value.
Docs
- Run deployments (Prefect 3.x)
- prefect concurrency-limit
CLI help: run prefect concurrency-limit --help
in your shell
If you share how your child flows are deployed (which work pool) I can suggest the exact pool setting or tag strategy to enforce “10 at a time.”Ben Griffiths
09/09/2025, 11:08 AMMarvin
09/09/2025, 11:09 AMMarvin
09/09/2025, 11:10 AMprefect concurrency-limit create my-shared-file 1
2) Wrap the exact file I/O with the concurrency context in the code that touches the file (ideally inside the child flow or the task it calls):
- Synchronous:
from prefect.concurrency.sync import concurrency
@flow
def child_flow(x: int):
# ... prep work ...
with concurrency("my-shared-file"):
with open("/path/to/local.file", "a+") as f:
f.write(f"{x}\n")
# ... post work ...
- Asynchronous:
from prefect.concurrency.asyncio import concurrency
@flow
async def child_flow(x: int):
async with concurrency("my-shared-file"):
# do async file I/O or call sync via a thread
...
Notes and variants
- If you keep subflows in-process (ThreadPoolTaskRunner/ProcessPoolTaskRunner), the context above still works and is the simplest way to avoid races.
- If you use run_deployment (distributed across workers), keep the same concurrency guard inside the child flow. This serializes access even across machines—as long as all runs talk to the same Prefect server/cloud. Make sure the “local file” actually lives on a shared filesystem accessible to those workers; otherwise each worker will read/write a different file.
- If you prefer coarse-grained control, you can tag the entire task that does the file I/O and create a tag-based limit instead of using the context manager:
- Task:
@task(tags=["my-shared-file"])
def write_file(x: int): ...
- Limit:
prefect concurrency-limit create my-shared-file 1
This serializes the whole task; the context manager lets you serialize just the critical section.
- You can allow limited parallelism by setting the limit > 1 (e.g., 5 readers) or combine multiple names in concurrency([“file-A”, “file-B”])
if a task touches multiple resources.
Helpful docs
- Tag-based concurrency limits
- Concurrency context manager API
Quick check: Is your “local file” on the same host as the parent flow, or on a shared filesystem that all workers can access? If it’s truly local to one machine, I’d keep subflows in-process and guard with the concurrency context for the simplest, safest setup.Ben Griffiths
09/09/2025, 11:20 AMMarvin
09/09/2025, 11:20 AMMarvin
09/09/2025, 11:22 AMfrom prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
@flow(log_prints=False, persist_result=False) # reduce overhead
def grandchild_flow(x): ...
@flow(log_prints=False, persist_result=False)
def child_flow(x): return grandchild_flow(x)
@task
def run_child(x): return child_flow(x)
@flow(task_runner=ThreadPoolTaskRunner(max_workers=20))
def parent_flow():
futs = [run_child.submit(i) for i in range(100)]
return [f.result() for f in futs]
- If your code currently does for ...: child_flow(i)
without submit/gather, it will be sequential.
2) Tune concurrency without oversubscribing
- Parent concurrency: set ThreadPoolTaskRunner(max_workers=...)
(I/O‑bound) or ProcessPoolTaskRunner
(CPU‑bound).
- Avoid “nested” pools multiplying threads: if you run many child flows in parallel, consider making each child flow run its internals sequentially:
@flow(task_runner=ThreadPoolTaskRunner(max_workers=1))
def child_flow(...): ...
Then set the parent’s max_workers
high to parallelize across children, not within each child.
3) Reduce orchestration overhead
- Turn off extras you don’t need:
- Flows: @flow(log_prints=False, persist_result=False)
- Tasks: @task(log_prints=False, persist_result=False)
- Minimize chatty logging inside tight loops.
- Convert deeply nested “leaf” subflows into tasks when you don’t need subflow‑level orchestration. Tasks are lighter weight:
@task
def leaf_step(x): ...
@flow
def child_flow(x):
# fan out tasks instead of calling another subflow
futs = [leaf_step.submit(y) for y in compute_items(x)]
return [f.result() for f in futs]
4) If you’re using run_deployment: scale out and let the pool do the work
- Ensure the child deployment targets a work pool with enough concurrency and enough workers online to consume work.
- Trigger children concurrently from the parent (wrap in a task and submit). Pool concurrency will gate throughput; increase it to speed up.
Docs: Run deployments
5) Don’t over‑serialize with locks
- If you added a concurrency limit for local file safety, narrow it to just the file I/O section. A global limit of 1 around the whole child will serialize everything.
from prefect.concurrency.sync import concurrency
@flow
def child_flow(x):
# compute freely...
with concurrency("my-shared-file"): # keep the critical section minimal
write_to_file(x)
Docs: Tag-based concurrency limits
6) Match the runner to the workload
- I/O‑bound: ThreadPoolTaskRunner is usually best.
- CPU‑bound: ProcessPoolTaskRunner can help; just avoid large cross‑process object passing to reduce pickling overhead.
If you can share:
- whether you call child/grandchild via direct flow calls, tasks, or run_deployment
- whether the work is CPU or I/O bound
- whether you’ve added any concurrency locks for file access
…I’ll tailor an exact setup (runners, limits, and code) to maximize throughput for your case.
More on concurrency patterns: Run work concurrently (Prefect 3.x).Ben Griffiths
09/09/2025, 11:25 AMMarvin
09/09/2025, 11:25 AMMarvin
09/09/2025, 11:26 AMfrom prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
@flow # default runner (effectively sequential inside child)
def child_flow(i: int):
# do child work (optionally with a few tasks)
...
@task
def run_child(i: int):
return child_flow(i)
@flow(task_runner=ThreadPoolTaskRunner(max_workers=10))
def parent_flow():
futs = [run_child.submit(i) for i in range(50)]
return [f.result() for f in futs]
- If each child must do its own parallel work, balance the two levels so the product stays reasonable. For example:
- Parent max_workers=5, child max_workers=2 → worst-case ~10 concurrent units instead of 100.
@flow(task_runner=ThreadPoolTaskRunner(max_workers=2))
def child_flow(i: int): ...
@flow(task_runner=ThreadPoolTaskRunner(max_workers=5))
def parent_flow(): ...
- If you’re using run_deployment (distributed children), the parent’s runner doesn’t matter for child execution. Configure the child flow’s runner for its internal tasks and use the work pool’s concurrency to control how many children run at once.
Rules of thumb
- Choose one level to be the main “fan-out” and keep the other level small or sequential.
- I/O-bound work → ThreadPoolTaskRunner; CPU-bound → ProcessPoolTaskRunner.
- Measure and adjust: start with parent=10 and child default; only add child parallelism if you see a real bottleneck inside each child.
If you share whether your child flows run tasks internally and whether the work is CPU or I/O bound, I can suggest exact max_workers values.