<@ULVA73B9P> I have a docker container performing ...
# marvin-ai
t
@Marvin I have a docker container performing one computational task (modelling). Each container read input from
./input
using
config.json
from outside
Copy code
{
  "ensemble_member": "00"
}
then do the job and write output to
./output
folder Now I have 51 ens to run so I use the following command to instruct the container
Copy code
ens_ids = [str(i).zfill(2) for i in range(51)]  

@task
def run_mod():
run_con(image_name = "run_modA:fc:2.1"
                     local_path = mount_dir,
                     ens_id = ens_ids,
                     ens = False)
@flow
def run_model():
    futures = run_mod.map(ens_ids)
    results = [future.result() for future in futures]
    return results
51 container start at once, but only one output of
ens=00
is written What did I do wrong and how can I overcome this
m
thought for 212.6 seconds
A few things are likely going on here: 1) The mapped argument isn’t actually being passed into your task - In your snippet,
run_mod
takes no parameters, and inside it you set
ens_id = ens_ids
(the entire list), not the single item being mapped. - Also, in Prefect 3.x you generally use
task.submit(...)
for concurrency rather than
.map(...)
. 2) All containers are reading the same config.json and writing to the same output path - If every container mounts
./input/config.json
that contains
{"ensemble_member": "00"}
, they’ll all run for “00”. - If every container writes to the same
./output
with the same filenames, they’ll overwrite each other and you’ll only see one result. How to fix - Pass the current
ens_id
into your task and down into your container invocation. - Give each run its own config.json (or pass
ens_id
via env/args) and its own output directory. Here’s a Prefect 3.x pattern using
submit
and per-ens input/output dirs:
Copy code
from pathlib import Path
import json
from prefect import flow, task

ens_ids = [str(i).zfill(2) for i in range(51)]
mount_root = "/abs/path/to/work"  # base dir on the host for mounts

@task
def run_mod(ens_id: str):
    # Create per-ens input/output dirs
    inp_dir = Path(mount_root) / f"{ens_id}" / "input"
    out_dir = Path(mount_root) / f"{ens_id}" / "output"
    inp_dir.mkdir(parents=True, exist_ok=True)
    out_dir.mkdir(parents=True, exist_ok=True)

    # Write a per-ens config.json
    (inp_dir / "config.json").write_text(json.dumps({"ensemble_member": ens_id}))

    # Now run your container with per-ens mounts and the specific ens_id
    # NOTE: Adjust this to your actual 'run_con' signature
    return run_con(
        image_name="run_modA:fc:2.1",
        # ensure these mounts point to the per-ens dirs above
        mounts=[
            {"source": str(inp_dir), "target": "/app/input", "type": "bind", "read_only": True},
            {"source": str(out_dir), "target": "/app/output", "type": "bind"},
        ],
        # pass the ens_id explicitly so the container can use it if needed
        env={"ENSEMBLE_MEMBER": ens_id},
        # any other args your function needs...
    )

@flow
def run_model():
    futures = [run_mod.submit(ens_id) for ens_id in ens_ids]
    return [f.result() for f in futures]

if __name__ == "__main__":
    run_model()
Key points: - The task accepts
ens_id: str
and you submit one task per value. - Each run gets a unique input (with a matching config.json) and a unique output directory. - If your container reads only from
./input/config.json
, this guarantees it sees the right
ensemble_member
. - If your container supports env vars or CLI args to choose the ensemble member, you can skip writing config files and just pass
ens_id
via
env
or `command`—but still give each run a unique output directory. Questions to clarify so I can tailor this precisely: - Which Prefect version are you on? (The snippet above assumes Prefect 3.x.) - What does
run_con(...)
do exactly, and what parameters does it support (mounts, env, command)? - Does your container allow overriding the ensemble member via env/args, or must it read
./input/config.json
? - Do your output filenames collide across runs, or can the container write to a per-ens subfolder (e.g.,
./output/00
,
./output/01
, …)? Helpful references: - Prefect tasks and concurrency (Prefect 3): Task guide - Flows and running tasks concurrently: Flows overview
t
@Marvin Thanks, what if the
input path
is fixed and defined inside the container?
m
thought for 325.5 seconds
Totally fine — if the container’s input path is fixed (e.g., “/app/input”), you can still run 51 in parallel by giving each container a different host directory and mounting that host directory to the same fixed path inside each container. What to do - Create per-ensemble host directories for input and output. - Write a per-ensemble config.json into each input dir. - For each run, bind-mount that host input dir to the fixed in-container input path, and the host output dir to the fixed in-container output path. - Submit one task per ensemble ID so they run concurrently. Example (Prefect 3.x pattern)
Copy code
from pathlib import Path
import json
from prefect import flow, task

# Fixed paths inside the container
CONTAINER_INPUT = "/app/input"
CONTAINER_OUTPUT = "/app/output"

# Base host path that you control
HOST_BASE = "/abs/path/to/work"

ens_ids = [str(i).zfill(2) for i in range(51)]

@task
def run_mod(ens_id: str):
    # Prepare per-ens host dirs
    inp = Path(HOST_BASE) / ens_id / "input"
    out = Path(HOST_BASE) / ens_id / "output"
    inp.mkdir(parents=True, exist_ok=True)
    out.mkdir(parents=True, exist_ok=True)

    # Write per-ens config.json in the host input dir
    (inp / "config.json").write_text(json.dumps({"ensemble_member": ens_id}))

    # Start container, mounting per-ens host dirs to the fixed in-container paths
    # Replace this with your wrapper; the key is: different host dirs, same container paths
    return run_con(
        image_name="run_modA:fc:2.1",
        mounts=[
            {"source": str(inp), "target": CONTAINER_INPUT, "type": "bind", "read_only": True},
            {"source": str(out), "target": CONTAINER_OUTPUT, "type": "bind"},
        ],
        # If your container can also read ENSEMBLE_MEMBER, pass it too (optional)
        env={"ENSEMBLE_MEMBER": ens_id},
        name=f"runmod-{ens_id}"
    )

@flow
def run_model():
    futures = [run_mod.submit(e) for e in ens_ids]
    return [f.result() for f in futures]
If you’re using prefect-docker directly, you can pass Docker’s volumes via create_kwargs:
Copy code
from prefect_docker.containers import create_docker_container, start_docker_container

container = create_docker_container(
    image="run_modA:fc:2.1",
    detach=True,
    name=f"runmod-{ens_id}",
    environment={"ENSEMBLE_MEMBER": ens_id},
    # Standard docker-py style volumes mapping
    volumes={
        str(inp): {"bind": CONTAINER_INPUT, "mode": "ro"},
        str(out): {"bind": CONTAINER_OUTPUT, "mode": "rw"},
    },
)
start_docker_container(container.id)
CLI equivalent (to illustrate the idea):
Copy code
docker run --rm \
  -v /abs/path/to/work/00/input:/app/input:ro \
  -v /abs/path/to/work/00/output:/app/output \
  run_modA:fc:2.1
Why this works - Each container is isolated. You can mount different host paths to the same in-container path across different containers. - This guarantees each container sees only its own config.json at the fixed input path and writes to its own host output directory, avoiding overwrites. Questions for you - What are the exact fixed paths inside your container for input/output? - What does your
run_con(...)
function accept for mounts? If you share its signature, I can tailor the call precisely. Refs - Prefect 3 task/flow patterns and dynamic infra: Submit flows to dynamic infrastructure - Docker-based Prefect setups: Self-hosted with Docker