Tom Han
08/30/2025, 11:54 PMrun_deployment
, is the parent flow of the deployment (sub-sub-flow) the sub-flow or the original flow?Marvin
08/30/2025, 11:54 PMMarvin
08/30/2025, 11:55 PMrun_deployment
, the triggered flow run’s parent is that sub-flow. The original/top-level flow is the grandparent.
Notes:
- In Prefect 3.x, run_deployment
defaults to as_subflow=True
, which attaches the new flow run to the current flow run context (i.e., the caller).
- If you pass as_subflow=False
, the triggered flow run will be independent (no parent), regardless of where it’s called.
- Behavior is effectively the same in Prefect 2.x.
Example:
from prefect import flow
from prefect.deployments import run_deployment
@flow
def parent():
sub()
@flow
def sub():
run_deployment("my-flow/my-deployment") # parent of this run is 'sub'; 'parent' is the grandparent
Refs:
- REST: create flow run from deployment (parent is derived from the current flow run when run as a subflow)
- run_deployment
signature shows as_subflow=True
by default (confirmed from the code)Tom Han
08/31/2025, 12:38 AMMarvin
08/31/2025, 12:38 AMMarvin
08/31/2025, 12:41 AMrun_deployment
for B, wait for B to finish, then call run_deployment
for C.
Notes on run_deployment
behavior:
- By default it creates the new flow run as a subflow of the caller (as_subflow=True
).
- It will wait until the run reaches a final state if timeout=None
(default). Set timeout=0
to fire-and-forget.
- You can override routing at call-time using work_queue_name
if needed.
Example orchestrator on A:
from prefect import flow
from prefect.deployments import run_deployment
@flow
def orchestrate():
# 1) Trigger B on machine B and wait for completion
b = run_deployment(
name="flow-b/deployment-b",
parameters={"some": "param"},
timeout=None, # wait until B finishes (default)
as_subflow=True # keep A as the parent
# work_queue_name="machine-b-queue", # optional override
)
if not b.state or not b.state.is_completed():
raise RuntimeError(f"Flow B did not complete successfully: {b.state}")
# 2) Trigger C on machine C (wait or fire-and-forget)
c = run_deployment(
name="flow-c/deployment-c",
parameters={"upstream_run_id": str(b.id)},
timeout=0, # fire-and-forget; use None to wait
as_subflow=True
# work_queue_name="machine-c-queue",
)
return {"b_id": str(b.id), "c_id": str(c.id)}
Routing to specific machines
Option A: One pool with per-machine queues
- Create a single process (or docker/k8s) work pool and define queues like machine-a
, machine-b
, machine-c
.
- Start a worker on each machine that polls only its queue.
- Point each deployment to the corresponding queue.
Option B: One pool per machine
- Create process pools machine-a-pool
, machine-b-pool
, machine-c-pool
.
- Start one worker per machine polling its pool.
- Assign each deployment to the appropriate pool.
Helpful CLI (verified):
# Create a work pool
prefect work-pool create "machine-b-pool" --type process
# Create queues (if using queues)
prefect work-queue create "machine-b-queue"
# Start workers on each machine
prefect worker start --pool "machine-b-pool" --work-queue "machine-b-queue"
prefect worker start --pool "machine-c-pool" --work-queue "machine-c-queue"
Alternative: Event-driven chaining (no code on A)
If you prefer to decouple, use a Prefect Cloud Automation to trigger C when B completes:
- Trigger: Flow run state entered = Completed, for Deployment = B
- Action: Create flow run for Deployment = C
This removes the need for A to wait or coordinate and is very robust.
Docs/refs:
- run_deployment
API (params like timeout
, as_subflow
, work_queue_name
): Run deployments API
- Creating deployments: Create deployments
- Work pools/workers overview: Blog: Work Pools and Workers
If you share how you’ve set up your work pools/queues today, I can map your exact A/B/C deployments and give you concrete commands and code.Tom Han
08/31/2025, 3:03 AMMarvin
08/31/2025, 3:03 AMMarvin
08/31/2025, 3:05 AMtimeout=None
)
- B’s flow result is persisted to shared storage (S3/GCS/Azure/etc.)
- The return value is serializable (set a serializer if needed)
Example:
from prefect import flow
from prefect.deployments import run_deployment
from prefect.states import get_state_result
from prefect.serializers import JSONSerializer
# On machine B (or wherever B runs)
@flow(persist_result=True, result_serializer=JSONSerializer())
def flow_b(...):
# your logic
return {"records": 123, "path": "<s3://my-bucket/data.json>"}
# Orchestrator on machine A
@flow
def orchestrate():
# Wait for B to complete so its state includes a final result
b_run = run_deployment("flow-b/deployment-b", timeout=None)
# Pull B's Python return value from the completed state
b_output = get_state_result(b_run.state) # -> {"records": 123, "path": ...}
# Pass into C as parameters
run_deployment(
"flow-c/deployment-c",
parameters={"b_output": b_output},
timeout=0, # fire-and-forget or None to wait
)
Notes:
- For cross-machine retrieval, configure B’s flow with remote result storage (e.g., S3/GCS/Azure). Without it, results may be local to machine B and not retrievable by A.
- If your object isn’t JSON-serializable, choose a different serializer (e.g., Pickle) via result_serializer=
and ensure the environment trusts that serializer.
Option 2: Pass a reference instead of the whole payload
- Have B write its output to durable storage (S3/GCS/etc.) and return a small reference (e.g., a path or ID).
- A reads that reference from B’s state and passes the reference to C.
- C loads the data from the reference.
- This avoids moving large objects through Prefect’s state payloads and keeps parameters simple/JSON-friendly.
Lightweight variant using artifacts (optional):
- B creates an artifact (e.g., a link to S3). A reads the link (or you pass it directly as a parameter), and C consumes it. This is nice for observability but not required.
Key APIs/Docs:
- run_deployment
(wait, subflow, etc.): Run deployments API
- How-to run deployments: How to run deployments
If you share what storage you’re using (S3/GCS/Azure) and whether you need JSON vs Pickle, I can give you exact code to set result_storage
and the serializer for B.