<@ULVA73B9P> I have two flows I want to execute in...
# ask-marvin
t
@Marvin I have two flows I want to execute in sequence
m
thought for 112.3 seconds
Great — there are a few good ways to run Flow B after Flow A in Prefect 3.x. A couple quick questions so I can tailor this: - Are these flows deployed (and possibly on different infrastructure/work pools) or just running locally? - Do you need to pass data from Flow A to Flow B? - Should Flow B run only if Flow A succeeds, or always? In the meantime, here are the common patterns: 1) Simple: call flows in sequence within an “orchestrator” flow - Best if both flows live in the same codebase and can run in the same process.
Copy code
from prefect import flow, get_run_logger

@flow
def flow_a(x: int) -> int:
    return x + 1

@flow
def flow_b(y: int):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Flow B got {y}")

@flow
def orchestrator(x: int = 1):
    a_result = flow_a(x)      # runs first, blocks until complete
    flow_b(a_result)          # runs second
2) Chaining deployed flows: orchestrator uses run_deployment - Best if each flow is its own deployment and may run on different infra/work pools. -
run_deployment
will wait for the child run to finish (or until
timeout
) and will raise if it fails, preserving the sequence.
Copy code
from prefect import flow
from prefect.deployments.flow_runs import run_deployment

@flow
def orchestrator(x: int = 1):
    # Run Flow A’s deployment first
    run_deployment(
        name="my-project/flow-a:prod",
        parameters={"x": x},
        # as_subflow=True keeps lineage in the UI; you can set False if you prefer
    )

    # Then run Flow B’s deployment
    run_deployment(
        name="my-project/flow-b:prod",
        parameters={"y": 42}   # pass inputs you need; if you need A’s output, see note below
    )
Note on passing data between deployments: since these runs may execute in different environments, pass data via a durable store (e.g., write to S3/GCS/db in A, read in B) or persist a result/artifact and fetch it in B. 3) Zero-orchestrator code: use an Automation to trigger B when A completes - Create an Automation with a trigger like “When deployment <flow-a:prod> run completes successfully” and an action “Run deployment <flow-b:prod>”. - This is great for loosely-coupled chaining and keeps sequencing in the control plane. - Docs: Creating deployment-trigger automations Which approach fits your setup? If you share whether you’re using deployments and if data needs to flow from A to B, I’ll provide a tailored snippet.
t
@Marvin both flows are deployed, no pass data, flows B always follows A and can be executed separatelly.
m
thought for 85.3 seconds
Got it — both are deployments, no data passing, and B should run after A but each can still be run on its own. Two clean options: A) Preferred: Automation that runs B when A finishes - Keeps A and B independent. Any time A’s deployment runs (manually, scheduled, or via API), B is automatically triggered. - Steps (UI): 1) Go to Automations → New automation 2) Trigger: “Deployment run state changes” - Filter to Flow A’s deployment - Condition: choose “Completed” if you want B only on success; or include Failed/Crashed if you want B regardless 3) Action: “Run deployment” → select Flow B’s deployment 4) Save and enable - Docs: Creating deployment-trigger automations - Helpful CLI for managing automations after creation: - List:
prefect automation ls
- Enable/disable:
prefect automation enable <name>
/
prefect automation disable <name>
B) Orchestrator deployment that calls A then B - Create a small “chain” flow that calls each deployment in sequence. You’d run this chain when you want A→B together; A and B are still runnable individually.
Copy code
from prefect import flow
from prefect.deployments.flow_runs import run_deployment

@flow
def chain(x: int = 1):
    # Run A
    run_deployment(
        name="flow-a/prod",      # format: <flow-name>/<deployment-name>
        parameters={"x": x},
        # as_subflow=True (default) keeps lineage in the UI
    )

    # Then run B
    run_deployment(
        name="flow-b/prod",
        parameters={"y": 42},
    )
Then create a deployment for
chain
so you can trigger the pair. A and B remain deployable/runable on their own. Quick check: should B run only when A succeeds, or even if A fails? If you want “always,” I’ll outline the Automation trigger to include terminal failure states as well. If you share your exact deployment names, I can tailor the examples.
t
@Marvin run_deployment seems to be async. I want the second run to follow a successful execution of the first
m
thought for 58.4 seconds
You can make the sequence deterministic by explicitly waiting for A to finish (and succeed) before starting B. In Prefect 3.x,
run_deployment(...)
returns a
FlowRun
object. To ensure B only runs after a successful A, wait for A’s run to complete and check its final state:
Copy code
from prefect import flow
from prefect.deployments.flow_runs import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.schemas import StateType

@flow
def chain():
    # Kick off A
    a_run = run_deployment(
        name="flow-a/prod",   # <flow-name>/<deployment-name>
        # parameters={...}     # if you have any
    )

    # Block until A finishes
    a_run = wait_for_flow_run(a_run.id, timeout=3600)

    # Only continue if A completed successfully
    if not a_run.state or a_run.state.type != StateType.COMPLETED:
        raise RuntimeError(f"Flow A did not complete successfully: {getattr(a_run.state, 'type', 'unknown')}")

    # Now start B
    run_deployment(
        name="flow-b/prod",
        # parameters={...}
    )
Notes: - This keeps A and B as independent deployments; you’re just sequencing them in an orchestrator flow. - If you’d prefer to avoid code and have B always follow A whenever A runs (manually, scheduled, etc.), create an Automation that triggers “Run deployment B” when deployment A enters the “Completed” state: - Docs: Creating deployment-trigger automations If you share your exact deployment names, I can tailor the snippet precisely.
t
@Marvin How do make this accept the deployment name instead of id? async with get_client() as client: flow_run = await client.create_flow_run_from_deployment(deployment_id="41d0e503-8aae-4f1e-a81f-06b40c7c7d73") flow_run = await wait_for_flow_run(flow_run_id=flow_run.id) print(flow_run.state)
m
thought for 105.5 seconds
In Prefect 3.x you can either: - Look up the deployment by name, then use its ID - Or call the helper that runs a deployment by name directly Name format:
"<flow-name>/<deployment-name>"
Option A — keep your async client flow, resolve by name:
Copy code
from prefect.client.orchestration import get_client
# if you're already importing wait_for_flow_run, keep that as-is

async with get_client() as client:
    deployment = await client.read_deployment_by_name("my-flow/my-deployment")
    flow_run = await client.create_flow_run_from_deployment(deployment_id=deployment.id)
    flow_run = await wait_for_flow_run(flow_run_id=flow_run.id)
    print(flow_run.state)
Option B — use the helper to run by name:
Copy code
from prefect.deployments import run_deployment

flow_run = run_deployment(name="my-flow/my-deployment")
# optional: wait for it to finish using your existing helper
flow_run = await wait_for_flow_run(flow_run_id=flow_run.id)
print(flow_run.state)
Docs: - Concepts: Deployments in Prefect 3.x - API: Deployments Python API
t
@Marvin flow_run = await wait_for_flow_run(flow_run_id=flow_run.id) AttributeError: 'coroutine' object has no attribute 'id' on Option 2
m
thought for 55.7 seconds
You’re seeing that because
run_deployment
is async-compatible. In an async context it returns a coroutine, so you must await it. Fix Option 2 by awaiting both calls:
Copy code
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run

async def main():
    flow_run = await run_deployment(name="my-flow/my-deployment")
    flow_run = await wait_for_flow_run(flow_run_id=flow_run.id)
    print(flow_run.state)
If you’re in a synchronous context instead, you can call both without `await`:
Copy code
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run

flow_run = run_deployment(name="my-flow/my-deployment")
flow_run = wait_for_flow_run(flow_run_id=flow_run.id)
print(flow_run.state)
Reminder: the deployment name format is
"<flow-name>/<deployment-name>"
. Docs: - Deployments in Prefect 3.x - Deployments Python API
t
@Marvin How do I make a flow fail and specify the error message
m
thought for 108.5 seconds
Shortest path: raise an exception with your message. Prefect will mark the flow run Failed and use the exception message. Example:
Copy code
from prefect import flow

@flow
def my_flow():
    # your logic...
    raise RuntimeError("Failed because X wasn't found")
If you want to explicitly set the final state/message without a traceback, return a Failed state from the flow:
Copy code
from prefect import flow
from prefect.states import Failed

@flow
def my_flow():
    # your logic...
    return Failed(message="Failed because X wasn't found")
Notes: - The message you provide will appear in the UI and in
flow_run.state.message
. - Works the same in async flows; just use
async def
and
await
where needed.