Riya Sinha
10/23/2025, 4:38 PMMarvin
10/23/2025, 4:38 PMMarvin
10/23/2025, 4:43 PM.submit() to run tasks concurrently.
- If the task body is blocking (e.g., time.sleep, CPU work) inside an async task, offload it with asyncio.to_thread(...) or Prefect’s run_sync_in_worker_thread(...).
- If you want multiple subflows to overlap, run the subflows concurrently (separate flow runs) instead of calling them directly.
Details and examples below.
1) Run tasks concurrently inside each subflow
- Replace direct/awaited task calls with .submit() and then gather results.
- Ensure your flow has a concurrent task runner (ThreadPool for I/O-bound, ProcessPool for CPU-bound).
Example:
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
@task
def do_io(i):
import time
time.sleep(180) # simulates blocking I/O
return i
@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def subflow(items):
futures = [do_io.submit(i) for i in items] # concurrent
return [f.result() for f in futures] # wait when you need results
Guide: Run work concurrently and Task runners
2) If you have an async task that does blocking work, offload the blocking part
If your task is async def but calls a blocking function, that will freeze the event loop and block everything else. Offload it:
from prefect import task
from prefect.utilities.asyncutils import run_sync_in_worker_thread
import asyncio
def blocking():
import time; time.sleep(180)
return "done"
@task
async def my_async_task():
# Preferred in Prefect:
return await run_sync_in_worker_thread(blocking)
# or Python stdlib:
# return await asyncio.to_thread(blocking)
API ref: prefect.utilities.asyncutils
3) If you want subflows themselves to run in parallel
Calling subflows directly runs them synchronously in the same process. To overlap subflows, submit multiple flow runs concurrently:
- Option A: Use `prefect.runner.submit_to_runner(...)`:
from prefect import flow, runner
from prefect.task_runners import ThreadPoolTaskRunner
@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def child(n):
# do concurrent tasks inside if needed
...
@flow
def parent():
runs = runner.submit_to_runner(child, parameters=[{"n": 3}, {"n": 5}, {"n": 7}])
runner.wait_for_submitted_runs(runs)
API ref: submit_to_runner
- Option B: Deploy child flows and trigger multiple runs (true parallel infra), then wait for them:
Docs: Create deployments
Which should you use?
- If the 3‑minute work is I/O-bound and you’re okay with threads: use .submit() with ThreadPoolTaskRunner.
- If it’s CPU-bound: use ProcessPoolTaskRunner.
- If you’ve written async code but call blocking functions: use run_sync_in_worker_thread or asyncio.to_thread.
- If you want subflows on separate infra in parallel: trigger multiple flow runs (submit_to_runner or deployments).
Could you share a minimal snippet of:
- how you call the subflows in the parent flow,
- how the long-running task is defined (sync vs async), and
- whether the work is I/O-bound or CPU-bound?
With that, I can tailor a precise fix.