<@ULVA73B9P> how to create a deployment with two c...
# ask-marvin
o
@Marvin how to create a deployment with two containers one doing a numpy 2.0 addition and one doing a numpy 2.10 multiplication in ony flow file? also the flow must spawn both containers multiple times.
m
thought for 346.6 seconds
Short answer: in Prefect 3.x a single deployment runs in one container image. To use two different containers (NumPy 2.0 for addition and NumPy 2.1.0 for multiplication), define two deployments (each with its own image) in the same flow file, then have a third “orchestrator” flow that spawns both deployments multiple times (concurrently if you want). Before we dive in: did you mean NumPy 2.1.0 rather than 2.10? NumPy uses 2.1.0 (there’s no 2.10). I’ll use 2.0.0 and 2.1.0 below—tell me if you need different versions. What this sets up - One file with three flows: -
add_flow
→ runs in a container with NumPy 2.0.0 -
mul_flow
→ runs in a container with NumPy 2.1.0 -
orchestrator
→ spawns multiple runs of both deployments - Two deployments (add and mul), each pinned to its own Docker image via job variables - Orchestrator can be run locally or deployed as well 1) Flow code (one file) Save as
flows.py
Copy code
from prefect import flow, task, get_run_logger
from prefect.deployments.flow_runs import run_deployment
from prefect.task_runners import ConcurrentTaskRunner


@task
def add(a: float, b: float) -> float:
    import numpy as np
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Add task NumPy version: {np.__version__}")
    return float(np.add(a, b))


@task
def mul(a: float, b: float) -> float:
    import numpy as np
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Mul task NumPy version: {np.__version__}")
    return float(np.multiply(a, b))


@flow(name="add-flow")
def add_flow(a: float, b: float) -> float:
    return add(a, b)


@flow(name="mul-flow")
def mul_flow(a: float, b: float) -> float:
    return mul(a, b)


@task
def launch(name: str, parameters: dict, run_suffix: str):
    # Kick off a deployment. Defaults to running as a subflow of this orchestrator.
    return run_deployment(
        name=name,  # "<FLOW_NAME>/<DEPLOYMENT_NAME>"
        parameters=parameters,
        flow_run_name=f"{name.replace('/','-')}-{run_suffix}",
    )


@flow(name="orchestrator", task_runner=ConcurrentTaskRunner())
def orchestrator(repeats: int = 3, a: float = 2, b: float = 3):
    # Launch both deployments 'repeats' times each, concurrently
    futures = []
    for i in range(repeats):
        futures.append(launch.submit("add-flow/add", {"a": a, "b": b}, f"add-{i}"))
        futures.append(launch.submit("mul-flow/mul", {"a": a, "b": b}, f"mul-{i}"))
    # Collect results (FlowRun objects returned by run_deployment)
    return [f.result() for f in futures]
2) Build two images (one per NumPy version) Example Dockerfiles: Dockerfile.add (NumPy 2.0.0)
Copy code
FROM python:3.11-slim
RUN pip install --no-cache-dir prefect numpy==2.0.0
WORKDIR /app
# Your code will be pulled by the worker from storage or mounted; this image just pins numpy.
Dockerfile.mul (NumPy 2.1.0)
Copy code
FROM python:3.11-slim
RUN pip install --no-cache-dir prefect numpy==2.1.0
WORKDIR /app
Build and push (adjust repo names as needed):
Copy code
docker build -t your-registry/prefect-numpy-add:2.0.0 -f Dockerfile.add .
docker push your-registry/prefect-numpy-add:2.0.0

docker build -t your-registry/prefect-numpy-mul:2.1.0 -f Dockerfile.mul .
docker push your-registry/prefect-numpy-mul:2.1.0
3) Create a Docker work pool and start a worker
Copy code
prefect work-pool create "docker-pool" --type docker
prefect worker start --pool docker-pool
4) Define deployments in prefect.yaml Add two deployments (same file, two containers via job_variables.image). ``` name: multi-container-example prefect-version: ">=3.0.0" deployments: - name: add entrypoint: flows.py:add_flow work_pool: name: docker-pool job_variables: image: "your-registry/prefect-numpy-add:2.0.0" - name: mul entrypoint: flows.py:mul_flow work_pool: name: docker-pool job_variables: image: "your-registry/prefect-numpy-mul:2.1.0" # Optional: deploy the orchestrator too (can use a generic image) - name: orchestrator
entrypoint: flows.py:orchestrator work_pool: name: docker-pool job_variables: image: "prefecthq/prefect:3-latest"
Copy code
Then deploy from the project root:
prefect deploy
Copy code
5) Run it (spawn both containers multiple times)
- Option A: Run orchestrator locally (no deployment needed)
python -c "from flows import orchestrator; orchestrator(repeats=5, a=2, b=3)"
Copy code
- Option B: Run the orchestrator deployment
prefect deployment run orchestrator/orchestrator -p repeats=5 -p a=2 -p b=3 ``` This will: - Start 5 add runs in the NumPy 2.0.0 image - Start 5 mul runs in the NumPy 2.1.0 image - Do it concurrently from one flow file Notes and references - This uses two deployments (each one container image) orchestrated by a parent flow. Prefect 3.x does not run multiple heterogeneous containers inside a single deployment run. The parent-orchestrator pattern is the recommended approach for “multi-container” pipelines. - You can also override the image at trigger time with job variables if you want to change images dynamically per run, but pinning a specific image in each deployment is usually clearer. - Customize job variables docs: Customize job variables (Prefect 3) - Pattern overview: Scalable microservices orchestration with Prefect and Docker Want me to turn this into a working repo template (with storage) or switch the worker to Kubernetes instead of Docker?
o
@Marvin currently i cant use an artifactory, my prefect-server and my prefect-client (where i develop from) are disconnected from web and there is not possibilty to use an image artifactory but can i push the images to the server in other way?
m
thought for 92.3 seconds
Yep — you can run fully air-gapped without any registry. The Docker worker will use images that are already present on the worker host; if the image tag exists locally it won’t try to pull. So the trick is to preload your images onto the worker machine(s) and reference those tags in your deployments. Two common offline strategies: 1) Preload Docker images on the worker host (no registry) - On a machine where you can build the images:
Copy code
docker build -t add:2.0.0 -f Dockerfile.add .
  docker build -t mul:2.1.0 -f Dockerfile.mul .

  docker save add:2.0.0 > add_2.0.0.tar
  docker save mul:2.1.0 > mul_2.1.0.tar
- Transfer the tarballs to the worker server (USB, secure copy over your internal network, etc.) - On the worker host:
Copy code
docker load -i add_2.0.0.tar
  docker load -i mul_2.1.0.tar
  docker images   # verify tags exist
- Reference those exact tags in your Prefect deployment job variables. For example in `prefect.yaml`:
Copy code
deployments:
    - name: add
      entrypoint: flows.py:add_flow
      work_pool:
        name: docker-pool
        job_variables:
          image: "add:2.0.0"

    - name: mul
      entrypoint: flows.py:mul_flow
      work_pool:
        name: docker-pool
        job_variables:
          image: "mul:2.1.0"
- Start your Docker worker and run your flows as usual:
Copy code
prefect work-pool create "docker-pool" --type docker
  prefect worker start --pool docker-pool
  prefect deploy
The worker will find the local images and run them without pulling. 2) Skip containers and use Process work pools with pre-created virtualenvs - If Docker is not an option, you can run two deployments in two different Python environments on the same host. - Create two venvs (e.g., one with numpy==2.0.0 and one with numpy==2.1.0) and configure each deployment to invoke the appropriate interpreter via a wrapper script or job variables’ command/env. This is workable but more manual than Docker; the Docker route is cleaner. If you’re on Kubernetes instead of Docker: - You can similarly pre-load images onto nodes (or run an internal, air-gapped registry if allowed). When preloading, set the imagePullPolicy to IfNotPresent/Never in your job template so the cluster won’t try to pull. Related docs: - Customize job variables (where you set
image
): Customize job variables (Prefect 3) Questions to confirm so I can tailor exact steps: - Are you using a Docker work pool or Kubernetes? - Can you transfer files to the worker server(s) via USB/secure copy? - Do you want me to show the
Dockerfile.add
and
Dockerfile.mul
again with the numpy pins and a minimal
prefect.yaml
targeting local tags only?
o
@Marvin i do not want to skip containers.
m
thought for 314.9 seconds
Got it — if you don’t want to skip containers, make sure your deployment is configured to build and use a Docker image. Quickest way in Prefect 3.x is to pass a DockerImage and set
build=True
(and
push=True
if you push to a registry) when creating the deployment:
Copy code
from prefect import flow
from prefect.flows import Flow
from prefect.docker import DockerImage

@flow
def my_flow():
    ...

# Example: deploying from a repo; works the same for local projects too
Flow.from_source(
    source="<https://github.com/your-org/your-repo>",
    entrypoint="flows/my_flow.py:my_flow",
).deploy(
    name="prod",
    work_pool_name="docker-pool",  # or your k8s/ecs/etc. pool
    image=DockerImage(
        name="your-registry/your-image",
        tag="2025-10-07",
        dockerfile="Dockerfile",   # "auto" will try to detect a Dockerfile
    ),
    build=True,   # do NOT skip container build
    push=True,    # push to your registry if needed
)
If you were reacting to the interactive
prefect deploy
prompt, choose “No” when it asks “Skip building containers?” so it builds the image. Useful references: - Store flow code for deployments - prefect.yaml configuration Quick question to make sure I guide you precisely: were you seeing the “Skip building containers?” prompt during
prefect deploy
, or are you configuring deployments in code/`prefect.yaml`?
o
@Marvin ok, but now i need to execute a container without prefect installed from one task.
m
thought for 71.8 seconds
Absolutely — you can run an external container from a Prefect task, and that container does not need Prefect installed. Prefect will orchestrate it from the outside and capture logs/exit code. Two common approaches: 1) Use the Docker CLI (simple, no extra Python deps) - Requires the Docker CLI and daemon available where the task runs (e.g., your local machine or a VM with Docker).
Copy code
from prefect import flow, task
import subprocess, shlex

@task
def run_external_container(
    image: str,
    cmd: str | None = None,
    env: dict[str, str] | None = None,
    volumes: dict[str, str] | None = None,  # {"host_path": "/container/path"}
    workdir: str | None = None,
):
    args = ["docker", "run", "--rm"]
    if workdir:
        args += ["-w", workdir]
    if env:
        for k, v in env.items():
            args += ["-e", f"{k}={v}"]
    if volumes:
        for host, container in volumes.items():
            args += ["-v", f"{host}:{container}"]
    args.append(image)
    if cmd:
        args += shlex.split(cmd)

    proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
    for line in proc.stdout:
        print(line, end="")
    rc = proc.wait()
    if rc != 0:
        raise RuntimeError(f"Container exited with code {rc}")
    return rc

@flow
def orchestrate():
    run_external_container(
        image="alpine:3.20",
        cmd="echo hello from container && uname -a",
        env={"FOO": "bar"},
        volumes={"/tmp": "/mnt"},
    )

if __name__ == "__main__":
    orchestrate()
2) Use the Docker SDK for Python - Requires
pip install docker
in your flow’s environment and Docker daemon access.
Copy code
from prefect import flow, task
import docker

@task
def run_external_container_docker_sdk(
    image: str,
    cmd: list[str] | None = None,
    env: dict[str, str] | None = None,
    volumes: dict[str, str] | None = None,  # {"host_path": "/container/path"}
    workdir: str | None = None,
):
    client = docker.from_env()
    # Convert simple {"host": "/container"} to docker SDK mapping
    vol_map = {h: {"bind": c, "mode": "rw"} for h, c in (volumes or {}).items()}

    container = client.containers.run(
        image=image,
        command=cmd,
        environment=env,
        volumes=vol_map,
        working_dir=workdir,
        detach=True,
        remove=True,  # auto-remove after exit
    )

    for line in container.logs(stream=True, follow=True):
        print(line.decode().rstrip())

    result = container.wait()
    code = result.get("StatusCode", 1)
    if code != 0:
        raise RuntimeError(f"Container exited with code {code}")
    return code

@flow
def orchestrate():
    run_external_container_docker_sdk(
        image="alpine:3.20",
        cmd=["sh", "-c", "echo hello from container && env | sort"],
        env={"FOO": "bar"},
        volumes={"/tmp": "/mnt"},
    )

if __name__ == "__main__":
    orchestrate()
Notes: - Make sure the runtime where your Prefect task executes has access to Docker. If you’re running the flow in Kubernetes or another container environment without Docker-in-Docker, you’ll need a different approach (e.g., use the Kubernetes Python client to launch a Job/Pod, or an ECS task if you’re on AWS). - The container image you run can be anything; it does not need Prefect installed. - If you want the task to fail when the container fails, keep the non-zero exit code check/exception. Quick question: where will this task run (local VM with Docker, Kubernetes, ECS, etc.)? I can tailor the example for your environment.