Tinh
08/21/2025, 9:45 AM./input
using config.json
from outside
{
"ensemble_member": "00"
}
then do the job and write output to ./output
folder
Now I have 51 ens to run so I use the following command to instruct the container
ens_ids = [str(i).zfill(2) for i in range(51)]
@task
def run_mod():
run_con(image_name = "run_modA:fc:2.1"
local_path = mount_dir,
ens_id = ens_ids,
ens = False)
@flow
def run_model():
futures = run_mod.map(ens_ids)
results = [future.result() for future in futures]
return results
51 container start at once, but only one output of ens=00
is written
What did I do wrong and how can I overcome thisMarvin
08/21/2025, 9:46 AMMarvin
08/21/2025, 9:49 AMrun_mod
takes no parameters, and inside it you set ens_id = ens_ids
(the entire list), not the single item being mapped.
- Also, in Prefect 3.x you generally use task.submit(...)
for concurrency rather than .map(...)
.
2) All containers are reading the same config.json and writing to the same output path
- If every container mounts ./input/config.json
that contains {"ensemble_member": "00"}
, they’ll all run for “00”.
- If every container writes to the same ./output
with the same filenames, they’ll overwrite each other and you’ll only see one result.
How to fix
- Pass the current ens_id
into your task and down into your container invocation.
- Give each run its own config.json (or pass ens_id
via env/args) and its own output directory.
Here’s a Prefect 3.x pattern using submit
and per-ens input/output dirs:
from pathlib import Path
import json
from prefect import flow, task
ens_ids = [str(i).zfill(2) for i in range(51)]
mount_root = "/abs/path/to/work" # base dir on the host for mounts
@task
def run_mod(ens_id: str):
# Create per-ens input/output dirs
inp_dir = Path(mount_root) / f"{ens_id}" / "input"
out_dir = Path(mount_root) / f"{ens_id}" / "output"
inp_dir.mkdir(parents=True, exist_ok=True)
out_dir.mkdir(parents=True, exist_ok=True)
# Write a per-ens config.json
(inp_dir / "config.json").write_text(json.dumps({"ensemble_member": ens_id}))
# Now run your container with per-ens mounts and the specific ens_id
# NOTE: Adjust this to your actual 'run_con' signature
return run_con(
image_name="run_modA:fc:2.1",
# ensure these mounts point to the per-ens dirs above
mounts=[
{"source": str(inp_dir), "target": "/app/input", "type": "bind", "read_only": True},
{"source": str(out_dir), "target": "/app/output", "type": "bind"},
],
# pass the ens_id explicitly so the container can use it if needed
env={"ENSEMBLE_MEMBER": ens_id},
# any other args your function needs...
)
@flow
def run_model():
futures = [run_mod.submit(ens_id) for ens_id in ens_ids]
return [f.result() for f in futures]
if __name__ == "__main__":
run_model()
Key points:
- The task accepts ens_id: str
and you submit one task per value.
- Each run gets a unique input (with a matching config.json) and a unique output directory.
- If your container reads only from ./input/config.json
, this guarantees it sees the right ensemble_member
.
- If your container supports env vars or CLI args to choose the ensemble member, you can skip writing config files and just pass ens_id
via env
or `command`—but still give each run a unique output directory.
Questions to clarify so I can tailor this precisely:
- Which Prefect version are you on? (The snippet above assumes Prefect 3.x.)
- What does run_con(...)
do exactly, and what parameters does it support (mounts, env, command)?
- Does your container allow overriding the ensemble member via env/args, or must it read ./input/config.json
?
- Do your output filenames collide across runs, or can the container write to a per-ens subfolder (e.g., ./output/00
, ./output/01
, …)?
Helpful references:
- Prefect tasks and concurrency (Prefect 3): Task guide
- Flows and running tasks concurrently: Flows overviewTinh
08/21/2025, 9:57 AMinput path
is fixed and defined inside the container?Marvin
08/21/2025, 9:57 AMMarvin
08/21/2025, 10:02 AMfrom pathlib import Path
import json
from prefect import flow, task
# Fixed paths inside the container
CONTAINER_INPUT = "/app/input"
CONTAINER_OUTPUT = "/app/output"
# Base host path that you control
HOST_BASE = "/abs/path/to/work"
ens_ids = [str(i).zfill(2) for i in range(51)]
@task
def run_mod(ens_id: str):
# Prepare per-ens host dirs
inp = Path(HOST_BASE) / ens_id / "input"
out = Path(HOST_BASE) / ens_id / "output"
inp.mkdir(parents=True, exist_ok=True)
out.mkdir(parents=True, exist_ok=True)
# Write per-ens config.json in the host input dir
(inp / "config.json").write_text(json.dumps({"ensemble_member": ens_id}))
# Start container, mounting per-ens host dirs to the fixed in-container paths
# Replace this with your wrapper; the key is: different host dirs, same container paths
return run_con(
image_name="run_modA:fc:2.1",
mounts=[
{"source": str(inp), "target": CONTAINER_INPUT, "type": "bind", "read_only": True},
{"source": str(out), "target": CONTAINER_OUTPUT, "type": "bind"},
],
# If your container can also read ENSEMBLE_MEMBER, pass it too (optional)
env={"ENSEMBLE_MEMBER": ens_id},
name=f"runmod-{ens_id}"
)
@flow
def run_model():
futures = [run_mod.submit(e) for e in ens_ids]
return [f.result() for f in futures]
If you’re using prefect-docker directly, you can pass Docker’s volumes via create_kwargs:
from prefect_docker.containers import create_docker_container, start_docker_container
container = create_docker_container(
image="run_modA:fc:2.1",
detach=True,
name=f"runmod-{ens_id}",
environment={"ENSEMBLE_MEMBER": ens_id},
# Standard docker-py style volumes mapping
volumes={
str(inp): {"bind": CONTAINER_INPUT, "mode": "ro"},
str(out): {"bind": CONTAINER_OUTPUT, "mode": "rw"},
},
)
start_docker_container(container.id)
CLI equivalent (to illustrate the idea):
docker run --rm \
-v /abs/path/to/work/00/input:/app/input:ro \
-v /abs/path/to/work/00/output:/app/output \
run_modA:fc:2.1
Why this works
- Each container is isolated. You can mount different host paths to the same in-container path across different containers.
- This guarantees each container sees only its own config.json at the fixed input path and writes to its own host output directory, avoiding overwrites.
Questions for you
- What are the exact fixed paths inside your container for input/output?
- What does your run_con(...)
function accept for mounts? If you share its signature, I can tailor the call precisely.
Refs
- Prefect 3 task/flow patterns and dynamic infra: Submit flows to dynamic infrastructure
- Docker-based Prefect setups: Self-hosted with Docker