Rob_C
09/25/2025, 7:44 PMMarvin
09/25/2025, 7:44 PMNate
09/25/2025, 7:44 PMasync def
but do they contain blocking code?Rob_C
09/25/2025, 7:45 PMNate
09/25/2025, 7:45 PMasync
thingRob_C
09/25/2025, 7:45 PMMarvin
09/25/2025, 7:50 PMasyncio.gather
won’t make them parallel.
- To run things concurrently, either:
1) Move the child logic into tasks and use a task runner with .submit
, or
2) Create deployments for the child flows and kick them off with run_deployment
concurrently (these will run as separate flow runs picked up by workers).
Details and examples:
1) If you just need concurrency inside a single run, use tasks + a task runner
- Best when the “children” don’t need to be independent flow runs.
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner # or ProcessPoolTaskRunner for CPU-bound
@task
def child_task(i):
# do work
return i * 2
@flow(task_runner=ThreadPoolTaskRunner())
def parent():
futures = [child_task.submit(i) for i in range(10)]
results = [f.result() for f in futures]
return results
2) If you want each child to be its own flow run, use deployments + run_deployment
- This scales out across workers and can be done concurrently with asyncio.gather
.
- By default, run_deployment
will link the new flow run as a subflow (as_subflow=True
), but it still runs as its own flow run on a worker.
import asyncio
from prefect import flow
from prefect.deployments.flow_runs import run_deployment
@flow
async def parent():
runs = await asyncio.gather(
run_deployment(name="my-project/child-a", parameters={"x": 1}),
run_deployment(name="my-project/child-b", parameters={"x": 2}),
run_deployment(name="my-project/child-c", parameters={"x": 3}),
)
return runs
Notes and gotchas:
- If you call child_flow()
directly from a parent flow, it will run as a subflow in-process and block the parent until it completes.
- If your run_deployment
calls appear “sequential,” make sure you have enough workers/slots in your work pool; if there’s only one worker slot, flow runs will queue and execute one after the other.
- For CPU-bound parallelism inside a single run, switch to ProcessPoolTaskRunner
.
- For global throttling across runs, consider Prefect’s concurrency limits: Global concurrency limits
- How-to on running work concurrently: Run work concurrently
- Running deployments programmatically: Run deployments
If you can share a minimal snippet of your parent/child flow code (and whether you’ve created deployments for the children), I can point out exactly why it’s running sequentially and suggest the most direct fix for your setup.Nate
09/25/2025, 7:59 PMWrapping subflow calls intrue but parallel != concurrent and this is wrongwon’t make them parallelasyncio.gather
- To run things concurrently, either:
1) Move the child logic into tasks and use a task runner with, or.submit
2) Create deployments for the child flows and kick them off withor not invalid, but unnecessaryconcurrently (these will run as separate flow runs picked up by workers).run_deployment
Nate
09/25/2025, 7:59 PMasyncio.gather
should allow concurrent subflowsNate
09/25/2025, 8:03 PM"""
Test concurrent child flows using anyio.to_thread.run_sync
"""
import asyncio
import time
import anyio
from prefect import flow
def blocking_work(name: str):
"""Simulate blocking work"""
print(f"{name} doing blocking work...")
time.sleep(2)
return f"{name} result"
@flow
async def child_flow(name: str):
"""Child flow using anyio.to_thread.run_sync for blocking code"""
print(f"[{time.strftime('%H:%M:%S')}] {name} starting")
# Using anyio.to_thread.run_sync as Rob mentioned
result = await anyio.to_thread.run_sync(blocking_work, name)
print(f"[{time.strftime('%H:%M:%S')}] {name} finished")
return result
@flow
async def parent_flow():
"""Parent flow using asyncio.gather"""
print(f"\n[{time.strftime('%H:%M:%S')}] Parent starting concurrent flows...")
start = time.time()
results = await asyncio.gather(
child_flow("Flow-A"), child_flow("Flow-B"), child_flow("Flow-C")
)
elapsed = time.time() - start
print(
f"[{time.strftime('%H:%M:%S')}] All done in {elapsed:.1f}s (should be ~2s if concurrent)"
)
print(f"Results: {results}")
if __name__ == "__main__":
asyncio.run(parent_flow())
definitely some concurrency happening hereRob_C
09/25/2025, 8:14 PMNate
09/25/2025, 8:23 PM.result()
will block and resolve the future
the rules are mostly the same as prefect 2 except for one big departure
you must resolve terminal futures yourself (or return
them, which worked in 2.x as well)
we'll still resolve futures you pass into a task or return
directly, but we no longer magically discover unresolved futures when you leave a scopeRob_C
09/25/2025, 8:46 PM