oleander-whirlwind
10/06/2025, 1:34 PMMarvin
10/06/2025, 1:34 PMMarvin
10/06/2025, 1:40 PMadd_flow → runs in a container with NumPy 2.0.0
- mul_flow → runs in a container with NumPy 2.1.0
- orchestrator → spawns multiple runs of both deployments
- Two deployments (add and mul), each pinned to its own Docker image via job variables
- Orchestrator can be run locally or deployed as well
1) Flow code (one file)
Save as flows.py
from prefect import flow, task, get_run_logger
from prefect.deployments.flow_runs import run_deployment
from prefect.task_runners import ConcurrentTaskRunner
@task
def add(a: float, b: float) -> float:
import numpy as np
logger = get_run_logger()
<http://logger.info|logger.info>(f"Add task NumPy version: {np.__version__}")
return float(np.add(a, b))
@task
def mul(a: float, b: float) -> float:
import numpy as np
logger = get_run_logger()
<http://logger.info|logger.info>(f"Mul task NumPy version: {np.__version__}")
return float(np.multiply(a, b))
@flow(name="add-flow")
def add_flow(a: float, b: float) -> float:
return add(a, b)
@flow(name="mul-flow")
def mul_flow(a: float, b: float) -> float:
return mul(a, b)
@task
def launch(name: str, parameters: dict, run_suffix: str):
# Kick off a deployment. Defaults to running as a subflow of this orchestrator.
return run_deployment(
name=name, # "<FLOW_NAME>/<DEPLOYMENT_NAME>"
parameters=parameters,
flow_run_name=f"{name.replace('/','-')}-{run_suffix}",
)
@flow(name="orchestrator", task_runner=ConcurrentTaskRunner())
def orchestrator(repeats: int = 3, a: float = 2, b: float = 3):
# Launch both deployments 'repeats' times each, concurrently
futures = []
for i in range(repeats):
futures.append(launch.submit("add-flow/add", {"a": a, "b": b}, f"add-{i}"))
futures.append(launch.submit("mul-flow/mul", {"a": a, "b": b}, f"mul-{i}"))
# Collect results (FlowRun objects returned by run_deployment)
return [f.result() for f in futures]
2) Build two images (one per NumPy version)
Example Dockerfiles:
Dockerfile.add (NumPy 2.0.0)
FROM python:3.11-slim
RUN pip install --no-cache-dir prefect numpy==2.0.0
WORKDIR /app
# Your code will be pulled by the worker from storage or mounted; this image just pins numpy.
Dockerfile.mul (NumPy 2.1.0)
FROM python:3.11-slim
RUN pip install --no-cache-dir prefect numpy==2.1.0
WORKDIR /app
Build and push (adjust repo names as needed):
docker build -t your-registry/prefect-numpy-add:2.0.0 -f Dockerfile.add .
docker push your-registry/prefect-numpy-add:2.0.0
docker build -t your-registry/prefect-numpy-mul:2.1.0 -f Dockerfile.mul .
docker push your-registry/prefect-numpy-mul:2.1.0
3) Create a Docker work pool and start a worker
prefect work-pool create "docker-pool" --type docker
prefect worker start --pool docker-pool
4) Define deployments in prefect.yaml
Add two deployments (same file, two containers via job_variables.image).
```
name: multi-container-example
prefect-version: ">=3.0.0"
deployments:
- name: add
entrypoint: flows.py:add_flow
work_pool:
name: docker-pool
job_variables:
image: "your-registry/prefect-numpy-add:2.0.0"
- name: mul
entrypoint: flows.py:mul_flow
work_pool:
name: docker-pool
job_variables:
image: "your-registry/prefect-numpy-mul:2.1.0"
# Optional: deploy the orchestrator too (can use a generic image)
- name: orchestratorMarvin
10/06/2025, 1:40 PMThen deploy from the project root:
prefect deploy
5) Run it (spawn both containers multiple times)
- Option A: Run orchestrator locally (no deployment needed)
python -c "from flows import orchestrator; orchestrator(repeats=5, a=2, b=3)"
- Option B: Run the orchestrator deployment
prefect deployment run orchestrator/orchestrator -p repeats=5 -p a=2 -p b=3
```
This will:
- Start 5 add runs in the NumPy 2.0.0 image
- Start 5 mul runs in the NumPy 2.1.0 image
- Do it concurrently from one flow file
Notes and references
- This uses two deployments (each one container image) orchestrated by a parent flow. Prefect 3.x does not run multiple heterogeneous containers inside a single deployment run. The parent-orchestrator pattern is the recommended approach for “multi-container” pipelines.
- You can also override the image at trigger time with job variables if you want to change images dynamically per run, but pinning a specific image in each deployment is usually clearer.
- Customize job variables docs: Customize job variables (Prefect 3)
- Pattern overview: Scalable microservices orchestration with Prefect and Docker
Want me to turn this into a working repo template (with storage) or switch the worker to Kubernetes instead of Docker?oleander-whirlwind
10/06/2025, 1:55 PMMarvin
10/06/2025, 1:55 PMMarvin
10/06/2025, 1:57 PMdocker build -t add:2.0.0 -f Dockerfile.add .
docker build -t mul:2.1.0 -f Dockerfile.mul .
docker save add:2.0.0 > add_2.0.0.tar
docker save mul:2.1.0 > mul_2.1.0.tar
- Transfer the tarballs to the worker server (USB, secure copy over your internal network, etc.)
- On the worker host:
docker load -i add_2.0.0.tar
docker load -i mul_2.1.0.tar
docker images # verify tags exist
- Reference those exact tags in your Prefect deployment job variables. For example in `prefect.yaml`:
deployments:
- name: add
entrypoint: flows.py:add_flow
work_pool:
name: docker-pool
job_variables:
image: "add:2.0.0"
- name: mul
entrypoint: flows.py:mul_flow
work_pool:
name: docker-pool
job_variables:
image: "mul:2.1.0"
- Start your Docker worker and run your flows as usual:
prefect work-pool create "docker-pool" --type docker
prefect worker start --pool docker-pool
prefect deploy
The worker will find the local images and run them without pulling.
2) Skip containers and use Process work pools with pre-created virtualenvs
- If Docker is not an option, you can run two deployments in two different Python environments on the same host.
- Create two venvs (e.g., one with numpy==2.0.0 and one with numpy==2.1.0) and configure each deployment to invoke the appropriate interpreter via a wrapper script or job variables’ command/env. This is workable but more manual than Docker; the Docker route is cleaner.
If you’re on Kubernetes instead of Docker:
- You can similarly pre-load images onto nodes (or run an internal, air-gapped registry if allowed). When preloading, set the imagePullPolicy to IfNotPresent/Never in your job template so the cluster won’t try to pull.
Related docs:
- Customize job variables (where you set image): Customize job variables (Prefect 3)
Questions to confirm so I can tailor exact steps:
- Are you using a Docker work pool or Kubernetes?
- Can you transfer files to the worker server(s) via USB/secure copy?
- Do you want me to show the Dockerfile.add and Dockerfile.mul again with the numpy pins and a minimal prefect.yaml targeting local tags only?oleander-whirlwind
10/07/2025, 5:40 AMMarvin
10/07/2025, 5:40 AMMarvin
10/07/2025, 5:46 AMbuild=True (and push=True if you push to a registry) when creating the deployment:
from prefect import flow
from prefect.flows import Flow
from prefect.docker import DockerImage
@flow
def my_flow():
...
# Example: deploying from a repo; works the same for local projects too
Flow.from_source(
source="<https://github.com/your-org/your-repo>",
entrypoint="flows/my_flow.py:my_flow",
).deploy(
name="prod",
work_pool_name="docker-pool", # or your k8s/ecs/etc. pool
image=DockerImage(
name="your-registry/your-image",
tag="2025-10-07",
dockerfile="Dockerfile", # "auto" will try to detect a Dockerfile
),
build=True, # do NOT skip container build
push=True, # push to your registry if needed
)
If you were reacting to the interactive prefect deploy prompt, choose “No” when it asks “Skip building containers?” so it builds the image.
Useful references:
- Store flow code for deployments
- prefect.yaml configuration
Quick question to make sure I guide you precisely: were you seeing the “Skip building containers?” prompt during prefect deploy, or are you configuring deployments in code/`prefect.yaml`?oleander-whirlwind
10/07/2025, 11:33 AMMarvin
10/07/2025, 11:33 AMMarvin
10/07/2025, 11:35 AMfrom prefect import flow, task
import subprocess, shlex
@task
def run_external_container(
image: str,
cmd: str | None = None,
env: dict[str, str] | None = None,
volumes: dict[str, str] | None = None, # {"host_path": "/container/path"}
workdir: str | None = None,
):
args = ["docker", "run", "--rm"]
if workdir:
args += ["-w", workdir]
if env:
for k, v in env.items():
args += ["-e", f"{k}={v}"]
if volumes:
for host, container in volumes.items():
args += ["-v", f"{host}:{container}"]
args.append(image)
if cmd:
args += shlex.split(cmd)
proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
for line in proc.stdout:
print(line, end="")
rc = proc.wait()
if rc != 0:
raise RuntimeError(f"Container exited with code {rc}")
return rc
@flow
def orchestrate():
run_external_container(
image="alpine:3.20",
cmd="echo hello from container && uname -a",
env={"FOO": "bar"},
volumes={"/tmp": "/mnt"},
)
if __name__ == "__main__":
orchestrate()
2) Use the Docker SDK for Python
- Requires pip install docker in your flow’s environment and Docker daemon access.
from prefect import flow, task
import docker
@task
def run_external_container_docker_sdk(
image: str,
cmd: list[str] | None = None,
env: dict[str, str] | None = None,
volumes: dict[str, str] | None = None, # {"host_path": "/container/path"}
workdir: str | None = None,
):
client = docker.from_env()
# Convert simple {"host": "/container"} to docker SDK mapping
vol_map = {h: {"bind": c, "mode": "rw"} for h, c in (volumes or {}).items()}
container = client.containers.run(
image=image,
command=cmd,
environment=env,
volumes=vol_map,
working_dir=workdir,
detach=True,
remove=True, # auto-remove after exit
)
for line in container.logs(stream=True, follow=True):
print(line.decode().rstrip())
result = container.wait()
code = result.get("StatusCode", 1)
if code != 0:
raise RuntimeError(f"Container exited with code {code}")
return code
@flow
def orchestrate():
run_external_container_docker_sdk(
image="alpine:3.20",
cmd=["sh", "-c", "echo hello from container && env | sort"],
env={"FOO": "bar"},
volumes={"/tmp": "/mnt"},
)
if __name__ == "__main__":
orchestrate()
Notes:
- Make sure the runtime where your Prefect task executes has access to Docker. If you’re running the flow in Kubernetes or another container environment without Docker-in-Docker, you’ll need a different approach (e.g., use the Kubernetes Python client to launch a Job/Pod, or an ECS task if you’re on AWS).
- The container image you run can be anything; it does not need Prefect installed.
- If you want the task to fail when the container fails, keep the non-zero exit code check/exception.
Quick question: where will this task run (local VM with Docker, Kubernetes, ECS, etc.)? I can tailor the example for your environment.