<@ULVA73B9P> If a sub-flow calls `run_deployment`,...
# ask-marvin
t
@Marvin If a sub-flow calls
run_deployment
, is the parent flow of the deployment (sub-sub-flow) the sub-flow or the original flow?
m
thought for 76.9 seconds
Short answer: the parent is the immediate caller. So if a sub-flow calls
run_deployment
, the triggered flow run’s parent is that sub-flow. The original/top-level flow is the grandparent. Notes: - In Prefect 3.x,
run_deployment
defaults to
as_subflow=True
, which attaches the new flow run to the current flow run context (i.e., the caller). - If you pass
as_subflow=False
, the triggered flow run will be independent (no parent), regardless of where it’s called. - Behavior is effectively the same in Prefect 2.x. Example:
Copy code
from prefect import flow
from prefect.deployments import run_deployment

@flow
def parent():
    sub()

@flow
def sub():
    run_deployment("my-flow/my-deployment")  # parent of this run is 'sub'; 'parent' is the grandparent
Refs: - REST: create flow run from deployment (parent is derived from the current flow run when run as a subflow) -
run_deployment
signature shows
as_subflow=True
by default (confirmed from the code)
t
@Marvin so what is the best way to chain multiple flows? I have a flow that runs on machine A, which triggers a subflow to machine B that talks back to machine A, then i want machine A to send and start a new flow to machine C
m
thought for 192.7 seconds
Great question. Here’s a reliable pattern for chaining flows across machines A → B → C in Prefect 3.x. Recommended approach: a single “orchestrator” flow on machine A - Deploy each flow (A, B, C) to a specific work pool/queue that maps to the target machine. - Run a worker on each machine polling only its queue/pool. - Have the flow on A call
run_deployment
for B, wait for B to finish, then call
run_deployment
for C. Notes on
run_deployment
behavior: - By default it creates the new flow run as a subflow of the caller (
as_subflow=True
). - It will wait until the run reaches a final state if
timeout=None
(default). Set
timeout=0
to fire-and-forget. - You can override routing at call-time using
work_queue_name
if needed. Example orchestrator on A:
Copy code
from prefect import flow
from prefect.deployments import run_deployment

@flow
def orchestrate():
    # 1) Trigger B on machine B and wait for completion
    b = run_deployment(
        name="flow-b/deployment-b",
        parameters={"some": "param"},
        timeout=None,          # wait until B finishes (default)
        as_subflow=True        # keep A as the parent
        # work_queue_name="machine-b-queue",  # optional override
    )
    if not b.state or not b.state.is_completed():
        raise RuntimeError(f"Flow B did not complete successfully: {b.state}")

    # 2) Trigger C on machine C (wait or fire-and-forget)
    c = run_deployment(
        name="flow-c/deployment-c",
        parameters={"upstream_run_id": str(b.id)},
        timeout=0,             # fire-and-forget; use None to wait
        as_subflow=True
        # work_queue_name="machine-c-queue",
    )
    return {"b_id": str(b.id), "c_id": str(c.id)}
Routing to specific machines Option A: One pool with per-machine queues - Create a single process (or docker/k8s) work pool and define queues like
machine-a
,
machine-b
,
machine-c
. - Start a worker on each machine that polls only its queue. - Point each deployment to the corresponding queue. Option B: One pool per machine - Create process pools
machine-a-pool
,
machine-b-pool
,
machine-c-pool
. - Start one worker per machine polling its pool. - Assign each deployment to the appropriate pool. Helpful CLI (verified):
Copy code
# Create a work pool
prefect work-pool create "machine-b-pool" --type process

# Create queues (if using queues)
prefect work-queue create "machine-b-queue"

# Start workers on each machine
prefect worker start --pool "machine-b-pool" --work-queue "machine-b-queue"
prefect worker start --pool "machine-c-pool" --work-queue "machine-c-queue"
Alternative: Event-driven chaining (no code on A) If you prefer to decouple, use a Prefect Cloud Automation to trigger C when B completes: - Trigger: Flow run state entered = Completed, for Deployment = B - Action: Create flow run for Deployment = C This removes the need for A to wait or coordinate and is very robust. Docs/refs: -
run_deployment
API (params like
timeout
,
as_subflow
,
work_queue_name
): Run deployments API - Creating deployments: Create deployments - Work pools/workers overview: Blog: Work Pools and Workers If you share how you’ve set up your work pools/queues today, I can map your exact A/B/C deployments and give you concrete commands and code.
t
@Marvin i am opting for that, but how do i get the return value of flows running on machine B to machine C?
m
thought for 137.5 seconds
You’ve got two good options. The simplest is to have A wait for B to finish, read B’s return value, then pass it to C. Option 1: Get B’s return value directly via state Requirements: - A waits for B to complete (use
timeout=None
) - B’s flow result is persisted to shared storage (S3/GCS/Azure/etc.) - The return value is serializable (set a serializer if needed) Example:
Copy code
from prefect import flow
from prefect.deployments import run_deployment
from prefect.states import get_state_result
from prefect.serializers import JSONSerializer

# On machine B (or wherever B runs)
@flow(persist_result=True, result_serializer=JSONSerializer())
def flow_b(...):
    # your logic
    return {"records": 123, "path": "<s3://my-bucket/data.json>"}

# Orchestrator on machine A
@flow
def orchestrate():
    # Wait for B to complete so its state includes a final result
    b_run = run_deployment("flow-b/deployment-b", timeout=None)

    # Pull B's Python return value from the completed state
    b_output = get_state_result(b_run.state)  # -> {"records": 123, "path": ...}

    # Pass into C as parameters
    run_deployment(
        "flow-c/deployment-c",
        parameters={"b_output": b_output},
        timeout=0,       # fire-and-forget or None to wait
    )
Notes: - For cross-machine retrieval, configure B’s flow with remote result storage (e.g., S3/GCS/Azure). Without it, results may be local to machine B and not retrievable by A. - If your object isn’t JSON-serializable, choose a different serializer (e.g., Pickle) via
result_serializer=
and ensure the environment trusts that serializer. Option 2: Pass a reference instead of the whole payload - Have B write its output to durable storage (S3/GCS/etc.) and return a small reference (e.g., a path or ID). - A reads that reference from B’s state and passes the reference to C. - C loads the data from the reference. - This avoids moving large objects through Prefect’s state payloads and keeps parameters simple/JSON-friendly. Lightweight variant using artifacts (optional): - B creates an artifact (e.g., a link to S3). A reads the link (or you pass it directly as a parameter), and C consumes it. This is nice for observability but not required. Key APIs/Docs: -
run_deployment
(wait, subflow, etc.): Run deployments API - How-to run deployments: How to run deployments If you share what storage you’re using (S3/GCS/Azure) and whether you need JSON vs Pickle, I can give you exact code to set
result_storage
and the serializer for B.