<@ULVA73B9P> If i have a flow running which starts...
# marvin-ai
p
@Marvin If i have a flow running which starts multiple flows, I want a way that you start these new flows but my orchestrator flow doesn't stop I just want to schedule the flow using Python
m
thought for 808.1 seconds
Great question. In Prefect 3.x, the simplest way to “fire-and-forget” other flows from an orchestrator flow is to trigger their deployments with
run_deployment
. This creates/schedules the downstream flow runs and returns immediately, so your orchestrator can keep going (or even finish) without waiting. Here are practical patterns you can copy/paste. Fire-and-forget a single flow run (now)
Copy code
from prefect import flow
from prefect.deployments.flow_runs import run_deployment

@flow
async def orchestrator():
    # Triggers immediately and returns a FlowRun object; does NOT wait for completion
    child_run = await run_deployment(
        name="my-child-flow/production",     # "flow_name/deployment_name" or a deployment UUID
        parameters={"x": 1},
        flow_run_name="child-1",
        as_subflow=False                     # optional: decouple from parent for full independence
    )
    return {"child_run_id": child_run.id}
Schedule a run for the future
Copy code
from datetime import datetime, timedelta, timezone
from prefect import flow
from prefect.deployments.flow_runs import run_deployment

@flow
async def orchestrator():
    scheduled_at = datetime.now(timezone.utc) + timedelta(minutes=10)

    child_run = await run_deployment(
        name="my-child-flow/backfill",
        parameters={"date": "2025-09-10"},
        scheduled_time=scheduled_at,
        as_subflow=False
    )
    return {"scheduled_child_run_id": child_run.id}
Trigger many runs concurrently (still non-blocking for run completion)
Copy code
import asyncio
from prefect import flow
from prefect.deployments.flow_runs import run_deployment

@flow
async def orchestrator(partitions: list[int]):
    # Schedule many at once; this only waits for creation, not completion
    tasks = [
        run_deployment(
            name="my-child-flow/etl",
            parameters={"partition": p},
            as_subflow=False
        )
        for p in partitions
    ]
    child_runs = await asyncio.gather(*tasks)
    return [fr.id for fr in child_runs]
Notes and tips -
as_subflow=False
fully decouples the child runs from the parent; they will not be cancelled if the parent is cancelled and will not be displayed as subflows in the UI. If you want them shown as subflows in the UI but still not wait for completion, you can leave the default (
True
);
run_deployment
does not wait by default. - You can also pass
tags
,
work_queue_name
, and
job_variables
to influence routing and infrastructure:
Copy code
await run_deployment(
      name="flow/deployment",
      parameters={"foo": "bar"},
      tags=["priority:high"],
      work_queue_name="my-queue",
      job_variables={"env": {"MY_VAR": "123"}},
      as_subflow=False
  )
Related docs - Creating and using deployments: How to create deployments - Event-driven alternatives with Automations (if you want events to kick off flows instead of Python): - Event triggers concept - Creating deployment triggers If you share how you’re identifying the target deployments and whether you want them linked as subflows in the UI, I can tailor the example further.