<@ULVA73B9P> How can I execute child flows, concur...
# ask-marvin
r
@Marvin How can I execute child flows, concurrently, from a parent flow. I'm using asyncio.gather using coros, but it still seems to be running concurrently
m
thought for 348.5 seconds
n
are your child flows actually async? like they may be defined as
async def
but do they contain blocking code?
r
but it still seems to be running sequentially** 🙃
n
im sure itll pick up what you mean, most commonly this happens bc ppl have blocking code in their
async
thing
r
It is aysnc, there were some sync calls but I pushed those calls to run_sync calls via anyio.to_thread
m
Great question — and this trips a lot of folks up. Short answer: - Calling a flow from another flow creates a subflow. Subflows execute in the same process as the parent and will run one-at-a-time. Wrapping subflow calls in
asyncio.gather
won’t make them parallel. - To run things concurrently, either: 1) Move the child logic into tasks and use a task runner with
.submit
, or 2) Create deployments for the child flows and kick them off with
run_deployment
concurrently (these will run as separate flow runs picked up by workers). Details and examples: 1) If you just need concurrency inside a single run, use tasks + a task runner - Best when the “children” don’t need to be independent flow runs.
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner  # or ProcessPoolTaskRunner for CPU-bound

@task
def child_task(i):
    # do work
    return i * 2

@flow(task_runner=ThreadPoolTaskRunner())
def parent():
    futures = [child_task.submit(i) for i in range(10)]
    results = [f.result() for f in futures]
    return results
2) If you want each child to be its own flow run, use deployments + run_deployment - This scales out across workers and can be done concurrently with
asyncio.gather
. - By default,
run_deployment
will link the new flow run as a subflow (
as_subflow=True
), but it still runs as its own flow run on a worker.
Copy code
import asyncio
from prefect import flow
from prefect.deployments.flow_runs import run_deployment

@flow
async def parent():
    runs = await asyncio.gather(
        run_deployment(name="my-project/child-a", parameters={"x": 1}),
        run_deployment(name="my-project/child-b", parameters={"x": 2}),
        run_deployment(name="my-project/child-c", parameters={"x": 3}),
    )
    return runs
Notes and gotchas: - If you call
child_flow()
directly from a parent flow, it will run as a subflow in-process and block the parent until it completes. - If your
run_deployment
calls appear “sequential,” make sure you have enough workers/slots in your work pool; if there’s only one worker slot, flow runs will queue and execute one after the other. - For CPU-bound parallelism inside a single run, switch to
ProcessPoolTaskRunner
. - For global throttling across runs, consider Prefect’s concurrency limits: Global concurrency limits - How-to on running work concurrently: Run work concurrently - Running deployments programmatically: Run deployments If you can share a minimal snippet of your parent/child flow code (and whether you’ve created deployments for the children), I can point out exactly why it’s running sequentially and suggest the most direct fix for your setup.
n
wow marvin is really dropping the ball here
Wrapping subflow calls in
asyncio.gather
won’t make them parallel
true but parallel != concurrent and this is wrong
- To run things concurrently, either:
1) Move the child logic into tasks and use a task runner with
.submit
, or
2) Create deployments for the child flows and kick them off with
run_deployment
concurrently (these will run as separate flow runs picked up by workers).
or not invalid, but unnecessary
asyncio.gather
should allow concurrent subflows
Copy code
"""
Test concurrent child flows using anyio.to_thread.run_sync
"""

import asyncio
import time

import anyio

from prefect import flow


def blocking_work(name: str):
    """Simulate blocking work"""
    print(f"{name} doing blocking work...")
    time.sleep(2)
    return f"{name} result"


@flow
async def child_flow(name: str):
    """Child flow using anyio.to_thread.run_sync for blocking code"""
    print(f"[{time.strftime('%H:%M:%S')}] {name} starting")

    # Using anyio.to_thread.run_sync as Rob mentioned
    result = await anyio.to_thread.run_sync(blocking_work, name)

    print(f"[{time.strftime('%H:%M:%S')}] {name} finished")
    return result


@flow
async def parent_flow():
    """Parent flow using asyncio.gather"""
    print(f"\n[{time.strftime('%H:%M:%S')}] Parent starting concurrent flows...")

    start = time.time()

    results = await asyncio.gather(
        child_flow("Flow-A"), child_flow("Flow-B"), child_flow("Flow-C")
    )

    elapsed = time.time() - start
    print(
        f"[{time.strftime('%H:%M:%S')}] All done in {elapsed:.1f}s (should be ~2s if concurrent)"
    )
    print(f"Results: {results}")


if __name__ == "__main__":
    asyncio.run(parent_flow())
definitely some concurrency happening here
r
Thank you for the information on this, this is helpful! I had a couple of .result calls that I think were being a little problematic, that I should otherwise let Prefect resolve. A little new to the prefect 3 world. One quick question, @Nate. Does passing a prefectfuture post submit, to a flow, get resolved like it does when passing it to another task? I believe the answer is no, but just to double check
n
oh
.result()
will block and resolve the future the rules are mostly the same as prefect 2 except for one big departure you must resolve terminal futures yourself (or
return
them, which worked in 2.x as well) we'll still resolve futures you pass into a task or
return
directly, but we no longer magically discover unresolved futures when you leave a scope

https://youtu.be/k74tEYSK_t8?si=xZEYMTIQTxphObxK&amp;t=222

r
Awesome, this is great. Appreciate the explanations!
catjam 1