Shubham
10/16/2025, 12:44 PMMarvin
10/16/2025, 12:44 PMMarvin
10/16/2025, 12:46 PMrun_deployment to trigger each step (sequentially or in parallel)
- For data handoff between steps, persist outputs to shared storage (e.g., S3/GCS) and pass pointers/keys as parameters to the next step
Minimal example
1) Define flows in source control (they can live in different repos)
# flows/step_a.py
from prefect import flow, get_run_logger
@flow
def step_a(n: int) -> int:
get_run_logger().info("Running step A")
# write to shared storage here if you need to pass data across deployments
return n * 2
# flows/step_b.py
from prefect import flow, get_run_logger
@flow
def step_b(x: int) -> str:
get_run_logger().info("Running step B")
return f"result={x}"
2) Create deployments, each with its own Docker image and work pool
from prefect.flows import Flow
# Deployment for step A: uses image A and pool A
Flow.from_source(
source="<https://github.com/acme/pipeline-a>",
entrypoint="flows/step_a.py:step_a",
).deploy(
name="step-a",
work_pool_name="docker-pool-a",
job_variables={"image": "<http://ghcr.io/acme/env-a:1.0.0|ghcr.io/acme/env-a:1.0.0>"},
)
# Deployment for step B: uses image B and pool B
Flow.from_source(
source="<https://github.com/acme/pipeline-b>",
entrypoint="flows/step_b.py:step_b",
).deploy(
name="step-b",
work_pool_name="docker-pool-b",
job_variables={"image": "<http://ghcr.io/acme/env-b:2.0.0|ghcr.io/acme/env-b:2.0.0>"},
)
3) Orchestrate with a parent flow using run_deployment
from prefect import flow
from prefect.deployments import run_deployment
@flow
def orchestrate():
# Trigger step A and wait (default behavior when running inside a flow)
a_run = run_deployment(name="step-a", parameters={"n": 21})
# If you need to pass data, read from shared storage or use artifacts/variables.
# Example passing a computed value directly:
run_deployment(name="step-b", parameters={"x": 42})
if __name__ == "__main__":
orchestrate()
Notes for smooth execution
- Waiting and subflows: run_deployment(...) called inside a flow runs “as a subflow” by default and will wait for completion. You can also set timeout or scheduled_time, and even override infra per run via `job_variables`:
- run_deployment(name="step-b", job_variables={"image": "<http://ghcr.io/acme/env-b:hotfix|ghcr.io/acme/env-b:hotfix>"})
- Data passing: Since each step runs in a separate container/process, pass data via persistent storage (S3/GCS/Azure Blob) or artifacts/variables, not in-memory objects.
- Parallelism: Fire off multiple run_deployment(...) calls for fan-out; Prefect will track them as subflows.
- Reliability: Add retries/timeouts to child deployments, and consider concurrency limits on work pools to control throughput.
Where to set up Docker workers
- Create two Docker work pools (one per environment) and start one worker per pool. You can do this in the UI or via the CLI. When using the CLI, be sure Docker extras are installed for the environment running the worker.
- The deployment’s work_pool_name and job_variables.image determine which image/environment runs each step.
Docs and references
- Docker-based deployments in Prefect 3.x: Docker infrastructure (Prefect 3)
- Orchestrating “microservices-style” with Prefect and Docker: Blog: Scalable microservices orchestration
A couple of quick questions to tailor this for you:Marvin
10/16/2025, 12:46 PM