<@ULVA73B9P> What is the latest best practice for ...
# ask-marvin
b
@Marvin What is the latest best practice for running subflows concurrently? I have a main flow which calls multiple subflows. I want the subflows to run concurrently rather than sequentially. Flows do not have a submit() function like tasks do.
m
thought for 522.4 seconds
Great question — you’re right that flows don’t have a
submit()
like tasks. In Prefect 3.x, there are three common, “best practice” patterns for running subflows concurrently from a parent flow: 1) In-process concurrency: wrap each subflow call in a task and submit those tasks - This is the simplest approach when your child flows can run in the same process as the parent. Use a task runner to get real concurrency.
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner  # or ProcessPoolTaskRunner

@flow
def child_flow(x: int) -> int:
    # ... do work ...
    return x * 2

@task
def run_child(x: int) -> int:
    # Calling a flow inside a task creates a recorded subflow run
    return child_flow(x)

@flow(task_runner=ThreadPoolTaskRunner())
def parent_flow():
    futures = [run_child.submit(i) for i in range(5)]
    results = [f.result() for f in futures]
    return results
- Use
ThreadPoolTaskRunner
for I/O-bound work;
ProcessPoolTaskRunner
for CPU-bound work. - You can stream results as they finish with
prefect.futures.as_completed
. 2) Async parent flow with concurrent execution via Prefect’s gather - If you prefer async, Prefect exposes a helper that runs callables concurrently using structured concurrency.
Copy code
from prefect import flow
from prefect.utilities.asyncutils import gather

@flow
def child_flow(x: int) -> int:
    return x * 2

@flow
async def parent_flow():
    r1, r2, r3 = await gather(
        lambda: child_flow(1),
        lambda: child_flow(2),
        lambda: child_flow(3),
    )
    return [r1, r2, r3]
- This keeps everything inside the parent process but runs the child calls concurrently. - Under the hood, Prefect uses AnyIO for structured concurrency. 3) Distributed concurrency: trigger child deployments concurrently - If you want each child to run on its own worker/infrastructure (best for scale or strong isolation), deploy the child flow and trigger the deployments concurrently (e.g., via tasks).
Copy code
from prefect import flow, task
from prefect.deployments import run_deployment

@task
def trigger_child(i: int):
    # as_subflow=True records it as a subflow of the parent
    fr = run_deployment(
        name="Child Flow/child-deployment",
        parameters={"x": i},
        as_subflow=True,
    )
    return fr  # FlowRun object

@flow
def parent_flow():
    futs = [trigger_child.submit(i) for i in range(10)]
    return [f.result() for f in futs]
-
run_deployment
can wait for completion and supports
timeout
and
poll_interval
to control that behavior. - This pattern lets your children fan out across your work pool and scale independently of the parent. Notes - Task runners still exist in Prefect 3.x and are the mechanism for in-process concurrency. - Calling a
@flow
from within another flow (even inside a task) is recorded as a subflow run. - Choose ThreadPool vs ProcessPool based on I/O-bound vs CPU-bound work; choose deployments when you want distributed execution. Docs and references - Run work concurrently (Prefect 3.x) - How AnyIO powers Prefect’s async architecture Quick check: Do you want the subflows to run concurrently in the same process as the parent (simpler, great for I/O work), or should they be distributed to separate workers (better isolation/scale)? I can tailor an example to your setup.
b
@Marvin I've wrapped the subflow in a task. However, when I run the flow, it tries to run all the subflows at the same time, every though I have set the concurrency limit of the worker pool to 20. How can I limit the number of subflows running concurrently, to say 10 at a time.
m
thought for 145.5 seconds
You’re seeing that because the worker pool’s concurrency limit only applies to separate flow runs fetched by workers. When you wrap a subflow call in a task and submit many of those tasks, they all run inside the single parent flow run’s process. The pool limit won’t throttle those in‑process tasks. You have a few good options to cap concurrent subflows at 10: Option A: Limit the parent flow’s task runner concurrency - Sets an upper bound on how many tasks (including your subflow-wrapper task) run at once inside the parent.
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner  # or ProcessPoolTaskRunner

@flow
def child_flow(x: int) -> int:
    return x * 2

@task
def run_child(x: int) -> int:
    return child_flow(x)  # recorded as a subflow

@flow(task_runner=ThreadPoolTaskRunner(max_workers=10))
def parent_flow():
    futs = [run_child.submit(i) for i in range(100)]
    return [f.result() for f in futs]
- ThreadPoolTaskRunner is great for I/O-bound work; use ProcessPoolTaskRunner for CPU-bound work. Option B: Use Prefect’s orchestration-level concurrency limits (by tag) - Tag the wrapper task and create a concurrency limit for that tag. Prefect will only allow 10 of those tasks to enter Running at once, even if more are submitted.
Copy code
from prefect import flow, task

@flow
def child_flow(x: int) -> int:
    return x * 2

@task(tags=["child-subflow"])
def run_child(x: int) -> int:
    return child_flow(x)

@flow
def parent_flow():
    futs = [run_child.submit(i) for i in range(100)]
    return [f.result() for f in futs]
Then create the limit:
Copy code
prefect concurrency-limit create child-subflow 10
You can also acquire the limit explicitly in code:
Copy code
from prefect.concurrency.sync import concurrency

@task
def run_child(x: int) -> int:
    with concurrency("child-subflow"):  # requires a concurrency limit with this name
        return child_flow(x)
Option C: If you want distributed subflows, trigger deployments and use a pool with concurrency=10 - Put the child flow in its own deployment that targets a work pool with concurrency set to 10. Then trigger those deployments from the parent (each child runs on a worker, and the pool enforces the 10‑at‑a‑time limit).
Copy code
from prefect import flow, task
from prefect.deployments import run_deployment

@task
def trigger_child(i: int):
    return run_deployment(
        name="Child Flow/child-deployment",
        parameters={"x": i},
        as_subflow=True,  # records parent/child relationship
    )

@flow
def parent_flow():
    futs = [trigger_child.submit(i) for i in range(100)]
    return [f.result() for f in futs]
Pick A or B if you’re keeping everything in-process; pick C if you want each subflow to fan out to workers and be throttled by a pool. References - Run work concurrently (Prefect 3.x) -
prefect concurrency-limit
CLI (manage tag-based limits): run
prefect concurrency-limit --help
for details If you can share whether your child work is I/O or CPU bound and whether you want in‑process vs distributed execution, I can recommend the best option and tailor the snippet further.
b
@Marvin Can I use run_deployment() inside the main flow to run the subflow concurrently?
m
thought for 60.3 seconds
Yes. You can call
run_deployment()
from your parent flow to launch child flow runs concurrently. How to do it - Wrap
run_deployment
in a task and submit many of them. Each call creates a separate child flow run on your workers and they’ll run in parallel, subject to your pool’s limits.
Copy code
from prefect import flow, task
from prefect.deployments import run_deployment

@task
def trigger_child(i: int):
    # as_subflow=True records the parent/child relation in the UI
    # timeout/poll_interval control waiting behavior while the task runs
    return run_deployment(
        name="Child Flow/child-deployment",
        parameters={"x": i},
        as_subflow=True,
        timeout=3600,
        poll_interval=5,
    )

@flow
def parent_flow():
    futs = [trigger_child.submit(i) for i in range(100)]
    runs = [f.result() for f in futs]  # wait for each child run
    return runs
How to cap concurrency at 10 - Set the concurrency on the work pool targeted by the child deployment(s) to 10. The pool will enforce that only 10 child runs are started at a time; the rest will queue. - Alternatively (or additionally), use a tag-based concurrency limit if you want cross-pool throttling: - Tag the child deployment or the
trigger_child
task with a tag, e.g.
child-subflow
- Create a limit:
prefect concurrency-limit create child-subflow 10
- You can also acquire it explicitly in code with
with concurrency("child-subflow"):
if desired. Notes - Using
run_deployment
launches truly separate flow runs on workers, so pool-level concurrency applies. This is different from calling a subflow function directly inside the parent process, where pool limits do not apply. - If you need to collect return values from child flows, consider in-process subflows (flow calls inside tasks) or persist results and read them back;
run_deployment
runs out-of-process and does not directly return the child flow’s Python return value. Docs - Run deployments (Prefect 3.x) -
prefect concurrency-limit
CLI help: run
prefect concurrency-limit --help
in your shell If you share how your child flows are deployed (which work pool) I can suggest the exact pool setting or tag strategy to enforce “10 at a time.”
b
@Marvin My subflow accesses a local file. Which concurrency approach is best to avoid file access errors?
m
thought for 90.2 seconds
Short answer: guard the file access with a Prefect concurrency limit, not just a worker-pool limit. Why: pool limits cap how many flow runs a pool executes, but they don’t protect a specific shared resource. If multiple subflows/threads hit the same local file at once, you can still get races. Prefect’s orchestration-level concurrency gives you a named “lock” you can acquire around the critical section, across threads, processes, and even workers. Recommended pattern 1) Create a concurrency limit for the file (1 slot = serialize access):
Copy code
prefect concurrency-limit create my-shared-file 1
2) Wrap the exact file I/O with the concurrency context in the code that touches the file (ideally inside the child flow or the task it calls): - Synchronous:
Copy code
from prefect.concurrency.sync import concurrency

@flow
def child_flow(x: int):
    # ... prep work ...
    with concurrency("my-shared-file"):
        with open("/path/to/local.file", "a+") as f:
            f.write(f"{x}\n")
    # ... post work ...
- Asynchronous:
Copy code
from prefect.concurrency.asyncio import concurrency

@flow
async def child_flow(x: int):
    async with concurrency("my-shared-file"):
        # do async file I/O or call sync via a thread
        ...
Notes and variants - If you keep subflows in-process (ThreadPoolTaskRunner/ProcessPoolTaskRunner), the context above still works and is the simplest way to avoid races. - If you use run_deployment (distributed across workers), keep the same concurrency guard inside the child flow. This serializes access even across machines—as long as all runs talk to the same Prefect server/cloud. Make sure the “local file” actually lives on a shared filesystem accessible to those workers; otherwise each worker will read/write a different file. - If you prefer coarse-grained control, you can tag the entire task that does the file I/O and create a tag-based limit instead of using the context manager: - Task:
Copy code
@task(tags=["my-shared-file"])
    def write_file(x: int): ...
- Limit:
Copy code
prefect concurrency-limit create my-shared-file 1
This serializes the whole task; the context manager lets you serialize just the critical section. - You can allow limited parallelism by setting the limit > 1 (e.g., 5 readers) or combine multiple names in
concurrency([“file-A”, “file-B”])
if a task touches multiple resources. Helpful docs - Tag-based concurrency limits - Concurrency context manager API Quick check: Is your “local file” on the same host as the parent flow, or on a shared filesystem that all workers can access? If it’s truly local to one machine, I’d keep subflows in-process and guard with the concurrency context for the simplest, safest setup.
b
@Marvin My main flow called a child flow, which calls another child flow. The child flows are running very slowly. How can I speed up subflow execution?
m
thought for 133.3 seconds
Short version: nested subflows tend to be slow when they run sequentially, are over‑serialized by locks, or are starved by low concurrency. You can speed them up by explicitly parallelizing, tuning task runners, reducing orchestration overhead, and (if needed) distributing them via deployments. Here are the main levers: 1) Ensure you’re actually running children concurrently - Call child flows from tasks and submit them, or use an async parent and gather:
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner

@flow(log_prints=False, persist_result=False)  # reduce overhead
def grandchild_flow(x): ...
@flow(log_prints=False, persist_result=False)
def child_flow(x): return grandchild_flow(x)

@task
def run_child(x): return child_flow(x)

@flow(task_runner=ThreadPoolTaskRunner(max_workers=20))
def parent_flow():
    futs = [run_child.submit(i) for i in range(100)]
    return [f.result() for f in futs]
- If your code currently does
for ...: child_flow(i)
without submit/gather, it will be sequential. 2) Tune concurrency without oversubscribing - Parent concurrency: set
ThreadPoolTaskRunner(max_workers=...)
(I/O‑bound) or
ProcessPoolTaskRunner
(CPU‑bound). - Avoid “nested” pools multiplying threads: if you run many child flows in parallel, consider making each child flow run its internals sequentially:
Copy code
@flow(task_runner=ThreadPoolTaskRunner(max_workers=1))
def child_flow(...): ...
Then set the parent’s
max_workers
high to parallelize across children, not within each child. 3) Reduce orchestration overhead - Turn off extras you don’t need: - Flows:
@flow(log_prints=False, persist_result=False)
- Tasks:
@task(log_prints=False, persist_result=False)
- Minimize chatty logging inside tight loops. - Convert deeply nested “leaf” subflows into tasks when you don’t need subflow‑level orchestration. Tasks are lighter weight:
Copy code
@task
def leaf_step(x): ...
@flow
def child_flow(x):
    # fan out tasks instead of calling another subflow
    futs = [leaf_step.submit(y) for y in compute_items(x)]
    return [f.result() for f in futs]
4) If you’re using run_deployment: scale out and let the pool do the work - Ensure the child deployment targets a work pool with enough concurrency and enough workers online to consume work. - Trigger children concurrently from the parent (wrap in a task and submit). Pool concurrency will gate throughput; increase it to speed up. Docs: Run deployments 5) Don’t over‑serialize with locks - If you added a concurrency limit for local file safety, narrow it to just the file I/O section. A global limit of 1 around the whole child will serialize everything.
Copy code
from prefect.concurrency.sync import concurrency

@flow
def child_flow(x):
    # compute freely...
    with concurrency("my-shared-file"):   # keep the critical section minimal
        write_to_file(x)
Docs: Tag-based concurrency limits 6) Match the runner to the workload - I/O‑bound: ThreadPoolTaskRunner is usually best. - CPU‑bound: ProcessPoolTaskRunner can help; just avoid large cross‑process object passing to reduce pickling overhead. If you can share: - whether you call child/grandchild via direct flow calls, tasks, or
run_deployment
- whether the work is CPU or I/O bound - whether you’ve added any concurrency locks for file access …I’ll tailor an exact setup (runners, limits, and code) to maximize throughput for your case. More on concurrency patterns: Run work concurrently (Prefect 3.x).
b
@Marvin Shoud I use ThreadPoolTaskRunner(max_workers=10) on all flows (parent and child) or just the parent flow?
m
thought for 20.3 seconds
Short answer: usually just the parent. Why - The task runner you set on a flow only controls concurrency of tasks within that flow run. - If you set ThreadPoolTaskRunner(max_workers=10) on both parent and child, you can easily oversubscribe: 10 parent tasks calling 10-child-thread pools → up to 100 threads contending, which often slows things down. Recommended patterns - Parallelize across children: set concurrency at the parent, keep children mostly sequential.
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner

@flow  # default runner (effectively sequential inside child)
def child_flow(i: int):
    # do child work (optionally with a few tasks)
    ...

@task
def run_child(i: int):
    return child_flow(i)

@flow(task_runner=ThreadPoolTaskRunner(max_workers=10))
def parent_flow():
    futs = [run_child.submit(i) for i in range(50)]
    return [f.result() for f in futs]
- If each child must do its own parallel work, balance the two levels so the product stays reasonable. For example: - Parent max_workers=5, child max_workers=2 → worst-case ~10 concurrent units instead of 100.
Copy code
@flow(task_runner=ThreadPoolTaskRunner(max_workers=2))
def child_flow(i: int): ...

@flow(task_runner=ThreadPoolTaskRunner(max_workers=5))
def parent_flow(): ...
- If you’re using run_deployment (distributed children), the parent’s runner doesn’t matter for child execution. Configure the child flow’s runner for its internal tasks and use the work pool’s concurrency to control how many children run at once. Rules of thumb - Choose one level to be the main “fan-out” and keep the other level small or sequential. - I/O-bound work → ThreadPoolTaskRunner; CPU-bound → ProcessPoolTaskRunner. - Measure and adjust: start with parent=10 and child default; only add child parallelism if you see a real bottleneck inside each child. If you share whether your child flows run tasks internally and whether the work is CPU or I/O bound, I can suggest exact max_workers values.