<@ULVA73B9P> I have a flow that calls a large numb...
# ask-marvin
g
@Marvin I have a flow that calls a large number of subflows within a loop. Currently, the default behavior of Prefect is to execute these subflows synchronously, but I want to perform this asynchronously, similar to triggering subflows with a task.submit() pattern. Within Prefect’s architecture, what is the recommended way to achieve this?
m
thought for 311.6 seconds
Great question. In Prefect 3.x, calling a flow from another flow is synchronous by design. To fan out subflows concurrently (like task.submit), the recommended pattern is to trigger the child flow as a deployment and return immediately. Here are the key patterns: - Recommended: trigger deployments asynchronously with run_deployment - Create a deployment for the child flow - In the parent flow, call
prefect.deployments.run_deployment(..., timeout=0, as_subflow=True)
inside your loop -
timeout=0
returns immediately so you can launch many children without waiting -
as_subflow=True
links them in the UI as subflows of the parent Example:
Copy code
from prefect import flow
from prefect.deployments import run_deployment

@flow
def parent(ids: list[int]):
    subflow_runs = []
    for i in ids:
        fr = run_deployment(
            name="child/child-deployment",     # "<flow_name>/<deployment_name>"
            parameters={"i": i},
            timeout=0,                         # do not wait; return immediately
            as_subflow=True,                   # link to parent in UI
            # tags=["fanout"], work_queue_name="...", job_variables={"...": "..."}
        )
        subflow_runs.append(fr)

    # Optionally: persist `fr.id` values somewhere to track/wait later
    return [fr.id for fr in subflow_runs]
- Waiting for completion (optional) - You can poll for terminal states using the Prefect client and the returned flow run IDs - Rough outline:
Copy code
import asyncio
from prefect.client.orchestration import get_client
from prefect.states import StateType

async def wait_for_flow_runs(run_ids):
    terminal = {StateType.COMPLETED, StateType.FAILED, StateType.CANCELLED, StateType.CRASHED}
    async with get_client() as client:
        pending = set(run_ids)
        while pending:
            next_pending = set()
            for rid in list(pending):
                fr = await client.read_flow_run(rid)
                # You can check fr.state_type or fr.state.type depending on your Prefect version
                stype = getattr(fr, "state_type", None) or fr.state.type
                if stype not in terminal:
                    next_pending.add(rid)
            if not next_pending:
                break
            pending = next_pending
            await asyncio.sleep(5)
Notes and tips: - Why deployments? Subflows triggered as deployments can be picked up by your work pool/worker fleet and run in parallel across infrastructure. Calling subflows directly inside tasks only parallelizes within the task runner’s resources and still ties execution to the parent’s worker process. - Concurrency control: If you’re launching many children, consider adding a concurrency limit on the deployment or work queue to avoid overload. - Cancellation/parent-child behavior:
as_subflow=True
links children to the parent for visibility and some lifecycle propagation. If you truly want them detached, set
as_subflow=False
. - Fire-and-forget: If you don’t need to wait, just store the returned FlowRun IDs (or skip storing) and let the workers handle them. Does this match your use case? Are you planning to wait for all subflows to finish, or is fire-and-forget sufficient? Also, do you already have the child flow packaged as a deployment, or would you like a quick example of creating one with
flow.from_source(...).deploy(...)
?