Tsvetelin
09/26/2025, 6:10 PMMarvin
09/26/2025, 6:10 PMMarvin
09/26/2025, 6:11 PMfrom prefect import flow, get_run_logger
@flow
def flow_a(x: int) -> int:
return x + 1
@flow
def flow_b(y: int):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Flow B got {y}")
@flow
def orchestrator(x: int = 1):
a_result = flow_a(x) # runs first, blocks until complete
flow_b(a_result) # runs second
2) Chaining deployed flows: orchestrator uses run_deployment
- Best if each flow is its own deployment and may run on different infra/work pools.
- run_deployment
will wait for the child run to finish (or until timeout
) and will raise if it fails, preserving the sequence.
from prefect import flow
from prefect.deployments.flow_runs import run_deployment
@flow
def orchestrator(x: int = 1):
# Run Flow A’s deployment first
run_deployment(
name="my-project/flow-a:prod",
parameters={"x": x},
# as_subflow=True keeps lineage in the UI; you can set False if you prefer
)
# Then run Flow B’s deployment
run_deployment(
name="my-project/flow-b:prod",
parameters={"y": 42} # pass inputs you need; if you need A’s output, see note below
)
Note on passing data between deployments: since these runs may execute in different environments, pass data via a durable store (e.g., write to S3/GCS/db in A, read in B) or persist a result/artifact and fetch it in B.
3) Zero-orchestrator code: use an Automation to trigger B when A completes
- Create an Automation with a trigger like “When deployment <flow-a:prod> run completes successfully” and an action “Run deployment <flow-b:prod>”.
- This is great for loosely-coupled chaining and keeps sequencing in the control plane.
- Docs: Creating deployment-trigger automations
Which approach fits your setup? If you share whether you’re using deployments and if data needs to flow from A to B, I’ll provide a tailored snippet.Tsvetelin
09/26/2025, 6:14 PMMarvin
09/26/2025, 6:14 PMMarvin
09/26/2025, 6:15 PMprefect automation ls
- Enable/disable: prefect automation enable <name>
/ prefect automation disable <name>
B) Orchestrator deployment that calls A then B
- Create a small “chain” flow that calls each deployment in sequence. You’d run this chain when you want A→B together; A and B are still runnable individually.
from prefect import flow
from prefect.deployments.flow_runs import run_deployment
@flow
def chain(x: int = 1):
# Run A
run_deployment(
name="flow-a/prod", # format: <flow-name>/<deployment-name>
parameters={"x": x},
# as_subflow=True (default) keeps lineage in the UI
)
# Then run B
run_deployment(
name="flow-b/prod",
parameters={"y": 42},
)
Then create a deployment for chain
so you can trigger the pair. A and B remain deployable/runable on their own.
Quick check: should B run only when A succeeds, or even if A fails? If you want “always,” I’ll outline the Automation trigger to include terminal failure states as well. If you share your exact deployment names, I can tailor the examples.Tsvetelin
09/27/2025, 11:43 AMMarvin
09/27/2025, 11:43 AMMarvin
09/27/2025, 11:44 AMrun_deployment(...)
returns a FlowRun
object. To ensure B only runs after a successful A, wait for A’s run to complete and check its final state:
from prefect import flow
from prefect.deployments.flow_runs import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.schemas import StateType
@flow
def chain():
# Kick off A
a_run = run_deployment(
name="flow-a/prod", # <flow-name>/<deployment-name>
# parameters={...} # if you have any
)
# Block until A finishes
a_run = wait_for_flow_run(a_run.id, timeout=3600)
# Only continue if A completed successfully
if not a_run.state or a_run.state.type != StateType.COMPLETED:
raise RuntimeError(f"Flow A did not complete successfully: {getattr(a_run.state, 'type', 'unknown')}")
# Now start B
run_deployment(
name="flow-b/prod",
# parameters={...}
)
Notes:
- This keeps A and B as independent deployments; you’re just sequencing them in an orchestrator flow.
- If you’d prefer to avoid code and have B always follow A whenever A runs (manually, scheduled, etc.), create an Automation that triggers “Run deployment B” when deployment A enters the “Completed” state:
- Docs: Creating deployment-trigger automations
If you share your exact deployment names, I can tailor the snippet precisely.Tsvetelin
09/29/2025, 3:59 PMMarvin
09/29/2025, 4:02 PMMarvin
09/29/2025, 4:04 PM"<flow-name>/<deployment-name>"
Option A — keep your async client flow, resolve by name:
from prefect.client.orchestration import get_client
# if you're already importing wait_for_flow_run, keep that as-is
async with get_client() as client:
deployment = await client.read_deployment_by_name("my-flow/my-deployment")
flow_run = await client.create_flow_run_from_deployment(deployment_id=deployment.id)
flow_run = await wait_for_flow_run(flow_run_id=flow_run.id)
print(flow_run.state)
Option B — use the helper to run by name:
from prefect.deployments import run_deployment
flow_run = run_deployment(name="my-flow/my-deployment")
# optional: wait for it to finish using your existing helper
flow_run = await wait_for_flow_run(flow_run_id=flow_run.id)
print(flow_run.state)
Docs:
- Concepts: Deployments in Prefect 3.x
- API: Deployments Python APITsvetelin
09/29/2025, 4:08 PMMarvin
09/29/2025, 4:08 PMMarvin
09/29/2025, 4:09 PMrun_deployment
is async-compatible. In an async context it returns a coroutine, so you must await it.
Fix Option 2 by awaiting both calls:
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
async def main():
flow_run = await run_deployment(name="my-flow/my-deployment")
flow_run = await wait_for_flow_run(flow_run_id=flow_run.id)
print(flow_run.state)
If you’re in a synchronous context instead, you can call both without `await`:
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
flow_run = run_deployment(name="my-flow/my-deployment")
flow_run = wait_for_flow_run(flow_run_id=flow_run.id)
print(flow_run.state)
Reminder: the deployment name format is "<flow-name>/<deployment-name>"
.
Docs:
- Deployments in Prefect 3.x
- Deployments Python APITsvetelin
09/29/2025, 4:53 PMMarvin
09/29/2025, 4:53 PMMarvin
09/29/2025, 4:55 PMfrom prefect import flow
@flow
def my_flow():
# your logic...
raise RuntimeError("Failed because X wasn't found")
If you want to explicitly set the final state/message without a traceback, return a Failed state from the flow:
from prefect import flow
from prefect.states import Failed
@flow
def my_flow():
# your logic...
return Failed(message="Failed because X wasn't found")
Notes:
- The message you provide will appear in the UI and in flow_run.state.message
.
- Works the same in async flows; just use async def
and await
where needed.