<@ULVA73B9P> i have a flow that launches a few sub...
# marvin-ai
r
@Marvin i have a flow that launches a few subflows. in each subflow, theres a task thats awaited, that takes ~3 min. instead of that task executing in parallel across each of the flows, it blocks all other execution and tasks. what do i have to do? should i use asyncio submit task? or asyncio to thread?
m
thought for 286.4 seconds
Short answer: you’re likely blocking the event loop or running tasks sequentially. To get parallelism in Prefect 3.x: - Don’t “await” tasks; use
.submit()
to run tasks concurrently. - If the task body is blocking (e.g.,
time.sleep
, CPU work) inside an async task, offload it with
asyncio.to_thread(...)
or Prefect’s
run_sync_in_worker_thread(...)
. - If you want multiple subflows to overlap, run the subflows concurrently (separate flow runs) instead of calling them directly. Details and examples below. 1) Run tasks concurrently inside each subflow - Replace direct/awaited task calls with
.submit()
and then gather results. - Ensure your flow has a concurrent task runner (ThreadPool for I/O-bound, ProcessPool for CPU-bound). Example:
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner

@task
def do_io(i):
    import time
    time.sleep(180)  # simulates blocking I/O
    return i

@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def subflow(items):
    futures = [do_io.submit(i) for i in items]      # concurrent
    return [f.result() for f in futures]            # wait when you need results
Guide: Run work concurrently and Task runners 2) If you have an async task that does blocking work, offload the blocking part If your task is
async def
but calls a blocking function, that will freeze the event loop and block everything else. Offload it:
Copy code
from prefect import task
from prefect.utilities.asyncutils import run_sync_in_worker_thread
import asyncio

def blocking():
    import time; time.sleep(180)
    return "done"

@task
async def my_async_task():
    # Preferred in Prefect: 
    return await run_sync_in_worker_thread(blocking)
    # or Python stdlib:
    # return await asyncio.to_thread(blocking)
API ref: prefect.utilities.asyncutils 3) If you want subflows themselves to run in parallel Calling subflows directly runs them synchronously in the same process. To overlap subflows, submit multiple flow runs concurrently: - Option A: Use `prefect.runner.submit_to_runner(...)`:
Copy code
from prefect import flow, runner
from prefect.task_runners import ThreadPoolTaskRunner

@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def child(n):
    # do concurrent tasks inside if needed
    ...

@flow
def parent():
    runs = runner.submit_to_runner(child, parameters=[{"n": 3}, {"n": 5}, {"n": 7}])
    runner.wait_for_submitted_runs(runs)
API ref: submit_to_runner - Option B: Deploy child flows and trigger multiple runs (true parallel infra), then wait for them: Docs: Create deployments Which should you use? - If the 3‑minute work is I/O-bound and you’re okay with threads: use
.submit()
with
ThreadPoolTaskRunner
. - If it’s CPU-bound: use
ProcessPoolTaskRunner
. - If you’ve written async code but call blocking functions: use
run_sync_in_worker_thread
or
asyncio.to_thread
. - If you want subflows on separate infra in parallel: trigger multiple flow runs (submit_to_runner or deployments). Could you share a minimal snippet of: - how you call the subflows in the parent flow, - how the long-running task is defined (sync vs async), and - whether the work is I/O-bound or CPU-bound? With that, I can tailor a precise fix.