Slackbot
08/21/2025, 9:43 AMMarvin
08/21/2025, 9:43 AMMarvin
08/21/2025, 10:13 AMens_ids
list. Instead, parameterize the task so each run launches one container for one ID.
2) Map or submit the task over your IDs
- Use .map(...)
if you like the functional style.
- Or use .submit(...)
in a loop for more control.
3) Control how many run concurrently
If you run this on one machine, you usually don’t want to start all 51 at once. Use ThreadPoolTaskRunner(max_workers=...)
to cap local concurrency.
4) Avoid input/output races
If every container reads ./input/config.json
and writes ./output
, give each run its own mount directory so they don’t clobber each other.
Here’s a complete example that:
- creates a per-ensemble mount directory,
- writes a per-ensemble config.json,
- launches your container runner,
- and caps concurrency to 8 at a time.
from prefect import flow, task, unmapped
from prefect.task_runners import ThreadPoolTaskRunner
from pathlib import Path
import json
# Your container runner
# def run_con(image_name: str, local_path: str, ens_id: str, ens: bool): ...
@task(retries=1) # optional retries
def run_mod(ens_id: str, base_mount: str):
# Create per-ens mount dir
mount = Path(base_mount) / f"ens_{ens_id}"
(mount / "input").mkdir(parents=True, exist_ok=True)
(mount / "output").mkdir(parents=True, exist_ok=True)
# Write per-ens config
cfg = {"ensemble_member": ens_id}
(mount / "input" / "config.json").write_text(json.dumps(cfg))
# Launch one container for this ens_id
return run_con(
image_name="run_modA:fc:2.1",
local_path=str(mount),
ens_id=ens_id,
ens=False,
)
@flow(name="WaterGAP: Model Computation",
log_prints=True,
task_runner=ThreadPoolTaskRunner(max_workers=8))
def run_watergap(base_mount: str):
ens_ids = [f"{i:02d}" for i in range(51)]
# Option A: mapping
futures = run_mod.map(ens_ids, base_mount=unmapped(base_mount))
# Option B: submit in a loop
# futures = [run_mod.submit(e, base_mount) for e in ens_ids]
# Wait for completion
return [f.result() for f in futures]
# Example local call
# run_watergap("/data/watergap")
Notes and caveats
- Concurrency: The example caps to 8 concurrent containers. If you truly want 51 at once on the same host, set max_workers=51
(be careful with CPU, RAM, disk, and Docker daemon load).
- Defaults: If you omit max_workers
, Prefect defers to the underlying thread pool’s default. It’s generally not tuned for launching lots of heavy external processes.
- Global concurrency limits: If you use Prefect Cloud/Server and want to coordinate concurrency across multiple runs or machines, you can add a global limit and gate the task with the concurrency context manager. See Global concurrency limits.
- Task runners: More about task runners in Prefect 3.x here: Task runners.
Questions to make sure this fits:
- Do you want all 51 containers to start at once, or should we cap them to avoid overwhelming the host?
- Are you running locally or via a deployment with a work pool (Docker/Kubernetes/etc.)?
- Is it okay to use separate mount directories per ensemble member as shown?Tinh
08/21/2025, 12:01 PMdef run_con(image_name: str, local_path: str, ens_id: str, ens: bool = False):
cmd = ["docker", "run"]
cmd += ["-v", f"{local_path}:/outlast"]
if ens:
cmd += [
"-e", f"ENS_ID={ens_id}",
"-v", f"{local_path}/Input/InputWaterGAP:/outlast/Input/InputWaterGAP:ro", #:ro to avoid read conflicts among container
]
cmd += [image_name]
result = subprocess.run(cmd, capture_output=True,text=True)
if result.returncode != 0:
raise Exception(f"Docker run failed: {result.stderr}")
print(result.stdout)
return result.stdout
and below is my task
and flow
@task
def run_fc(ens_id: str):
return run_con(image_name= "modA:2.1.2",
local_path=mount_dir,
ens_id=ens_id,
ens=True)
@flow
def run_mod():
ens_ids = [str(i).zfill(2) for i in range(51)]
futures = run_fc.map(ens_ids)
results = [future.result() for future in futures]
return results
This works perfectly for prefect 3.2.6
However, I dont know why some container exits directly after start (return no results) and with status "completed"
How can I check and handle this?Marvin
08/21/2025, 12:01 PMMarvin
08/21/2025, 12:02 PMfrom prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect import get_run_logger
import subprocess, os
def run_con(image_name: str, local_path: str, ens_id: str, ens: bool = False, timeout: int = 0):
# Make the container name unique (handy for debugging)
container_name = f"modA_{ens_id}"
cmd = ["docker", "run", "--rm", "--name", container_name]
cmd += ["-v", f"{local_path}:/outlast"]
if ens:
cmd += [
"-e", f"ENS_ID={ens_id}",
"-v", f"{local_path}/Input/InputWaterGAP:/outlast/Input/InputWaterGAP:ro",
]
cmd += [image_name]
# Optional timeout to avoid hangs
run_kwargs = dict(capture_output=True, text=True)
if timeout and timeout > 0:
run_kwargs["timeout"] = timeout
result = subprocess.run(cmd, **run_kwargs)
if result.returncode != 0:
raise RuntimeError(f"Docker run failed (ens_id={ens_id}): {result.stderr}")
return result.stdout, result.stderr
@task(retries=2, retry_delay_seconds=30)
def run_fc(ens_id: str, base_dir: str):
logger = get_run_logger()
# Run one container
stdout, stderr = run_con(
image_name="modA:2.1.2",
local_path=base_dir,
ens_id=ens_id,
ens=True,
timeout=0, # set e.g. 3600 to enforce a 1hr timeout
)
# Log output for observability
if stdout:
<http://logger.info|logger.info>(f"[{ens_id}] STDOUT:\n{stdout}")
if stderr:
<http://logger.info|logger.info>(f"[{ens_id}] STDERR:\n{stderr}")
# Post-run validation: check that expected output exists
# Adjust these checks to your actual outputs
out_dir = os.path.join(base_dir, "Output", ens_id) # or shared dir if that’s your design
# If you’re writing to a shared folder, check for expected files by pattern instead
produced_anything = os.path.exists(out_dir) and any(os.scandir(out_dir))
if not produced_anything:
# Treat as a failure so Prefect marks task as failed and retries if configured
raise RuntimeError(f"[{ens_id}] Container exited but no outputs were produced at {out_dir}")
return {"ens_id": ens_id, "output_dir": out_dir}
@flow(task_runner=ThreadPoolTaskRunner(max_workers=8)) # tune concurrency as needed
def run_mod(base_dir: str):
ens_ids = [f"{i:02d}" for i in range(51)]
futures = run_fc.map(ens_ids, base_dir=base_dir)
results = [f.result() for f in futures]
return results
Why this helps
- If a container exits quickly with code 0 but doesn’t produce the expected files, the task now raises, marking the task as Failed and (optionally) retrying.
- You’ll see stdout/stderr in Prefect logs for each ens_id
to understand why it exited quickly (e.g., already-done checks, missing input, permissions, etc.).
- max_workers=8
reduces resource contention; adjust to your host capacity.
Optional improvements
- Prevent output collisions: if multiple containers write the same filenames to a shared Output
directory, some may exit early if they detect “work already done”. If that’s not desired, mount per-ensemble subdirectories:
- Change the docker volume to -v {base_dir}/ens_{ens_id}:/outlast
and write outputs under that.Marvin
08/21/2025, 12:02 PMTinh
08/21/2025, 12:17 PMOSError: [Errno 9] Bad file descriptor: '/project/input/static_inputdata_22e_elevrange.nc4
Marvin
08/21/2025, 12:17 PMMarvin
08/21/2025, 12:23 PMimport subprocess
from pathlib import Path
def run_con(image_name: str, local_path: str, ens_id: str, ens: bool = False, timeout: int = 0):
# Host paths
host_input = Path(local_path) / "Input" / "InputWaterGAP"
host_output = Path(local_path) / "Output" / ens_id
host_output.mkdir(parents=True, exist_ok=True)
# Sanity check: input exists on host before we run the container
if not host_input.exists():
raise FileNotFoundError(f"Host input directory not found: {host_input}")
cmd = ["docker", "run", "--rm", "--name", f"modA_{ens_id}"]
# Mount inputs to the path your app expects INSIDE the container
# Add :z on SELinux systems (Fedora/RHEL) -> f"...:/project/input:ro,z"
cmd += ["-v", f"{host_input}:/project/input:ro"]
# Mount per-ens output to avoid collisions
cmd += ["-v", f"{host_output}:/project/output"]
if ens:
cmd += ["-e", f"ENS_ID={ens_id}"]
cmd += [image_name]
run_kwargs = dict(capture_output=True, text=True)
if timeout and timeout > 0:
run_kwargs["timeout"] = timeout
result = subprocess.run(cmd, **run_kwargs)
if result.returncode != 0:
raise RuntimeError(f"Docker run failed (ens_id={ens_id}): {result.stderr}")
return result.stdout
Quick preflight to verify the mount is correct
Run this once to confirm the container sees the file at the expected path:
docker run --rm \
-v "<LOCAL_PATH>/Input/InputWaterGAP:/project/input:ro" \
<your_image> \
sh -lc "ls -l /project/input && test -f /project/input/static_inputdata_22e_elevrange.nc4 && echo OK"
If that prints OK, the mount path is correct. If not, either the file doesn’t exist at the host path, casing differs (Linux is case-sensitive), or the container is looking in a different directory.
Extra tips
- SELinux: On Fedora/RHEL with SELinux enforcing, add :z to the bind mount so the container is allowed to read: …/project/inputro,z
- Working directory assumptions: If your app uses relative paths, set the working dir with -w /project or adjust your app to use absolute paths.
- Permissions: If the files are owned by a user that the container process can’t read, run the container as a compatible user (e.g., add --user $(id -u):$(id -g) and ensure permissions are at least 0444 on inputs).
- Concurrency: Reading the same inputs from 51 containers is fine with :ro, but make sure outputs are per-ensemble to prevent conflicts (as shown above).
In your Prefect task, keep logging stdout/stderr and raise on validation failure (so Prefect marks the task as failed and retries if configured). If you still see EBADF after fixing the mount target, please share:
- The exact container command your app runs to open the file
- OS (Linux/macOS/Windows), and whether you’re on SELinux
- The precise input file path on the host
We can refine from there.Tinh
08/21/2025, 12:38 PMMarvin
08/21/2025, 12:38 PMMarvin
08/21/2025, 12:41 PMfrom prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
import time
@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def run_mod(base_dir: str):
ens_ids = [f"{i:02d}" for i in range(51)]
futures = []
for i, ens_id in enumerate(ens_ids):
futures.append(run_fc.submit(ens_id=ens_id, base_dir=base_dir))
if i < len(ens_ids) - 1:
time.sleep(30) # stagger next start by 30s
return [f.result() for f in futures]
Option B: add a per-task delay parameter and sleep inside the task
from prefect import task
@task
def run_fc(ens_id: str, base_dir: str, start_delay_s: int = 0):
if start_delay_s:
import time; time.sleep(start_delay_s)
return run_con(image_name="modA:2.1.2", local_path=base_dir, ens_id=ens_id, ens=True)
@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def run_mod(base_dir: str):
ens_ids = [f"{i:02d}" for i in range(51)]
futures = [run_fc.submit(e, base_dir, start_delay_s=i*30) for i, e in enumerate(ens_ids)]
return [f.result() for f in futures]
Will this avoid your read conflict?
- It can reduce pressure on the filesystem and may “mask” the issue, but the robust fix is to ensure:
- inputs are mounted read-only at the exact in-container path your app uses, and
- outputs/temp paths are unique per run.
- If your inputs are on a network filesystem and you’re reading NetCDF/HDF5 in parallel, consider disabling HDF5 file locking inside the container:
- add -e HDF5_USE_FILE_LOCKING=FALSE
to docker run
- Keep concurrency reasonable with ThreadPoolTaskRunner(max_workers=...)
to avoid overwhelming the host.
Refs:
- Task runners: Prefect 3 task runners
- Global concurrency/rate controls (if using Prefect Cloud/Server): Global concurrency limitsTinh
08/21/2025, 2:36 PMdocker run
does not help in my case
# Post-run validation: check that expected output exists
# Adjust these checks to your actual outputs
out_dir = os.path.join(base_dir, "Output", "202508", ens_id) # or shared dir if that's your design
# If you're writing to a shared folder, check for expected files by pattern instead
produced_anything = os.path.exists(out_dir) and any(os.scandir(out_dir))
if not produced_anything:
#Treat as a failure so Prefect marks task as failed and retries if configured
raise RuntimeError(f"[{ens_id}] Container exited but no outputs were produced at {out_dir}")
This block sounds working wrongly. Even my containers return the output to the out_dir
, but the post_run validation
seems not yet find anything shortly after container exits, thus it report error and keep retrying
...
How could I improve this or just ignore this block?Marvin
08/21/2025, 2:37 PMMarvin
08/21/2025, 2:37 PMimport os, time
from pathlib import Path
def wait_for_outputs(path: str | Path, min_files: int = 1, timeout_s: int = 20, poll_s: float = 1.0) -> bool:
path = Path(path)
deadline = time.time() + timeout_s
while time.time() < deadline:
if path.is_dir():
try:
files = [e for e in os.scandir(path) if e.is_file()]
except FileNotFoundError:
files = []
if len(files) >= min_files:
return True
time.sleep(poll_s)
return False
# In your task after docker run:
ok = wait_for_outputs(out_dir, min_files=1, timeout_s=20, poll_s=1)
if not ok:
raise RuntimeError(f"[{ens_id}] Container exited but no outputs were produced at {out_dir}")
Option 2: Use a sentinel “done” file
Have the container write an atomic marker like /project/output/_SUCCESS
only after all outputs are flushed. Then your task just checks for that specific file:
if not Path(out_dir, "_SUCCESS").exists():
raise RuntimeError(f"[{ens_id}] Missing _SUCCESS in {out_dir}")
Option 3: Trust the exit code and downgrade the check to a warning
If your container already validates its work and exits non-zero on failure, you can skip strict validation in Prefect (or just log a warning if the directory looks empty):
if not produced_anything:
logger.warning(f"[{ens_id}] No files seen yet in {out_dir}; container exited 0. Proceeding.")
Also double-check the path you’re validating
Make sure the host path you’re checking matches the path you mounted for output. If you want .../Output/202508/<ens_id>
, mount that exact path into the container:
host_output = Path(base_dir) / "Output" / "202508" / ens_id
host_output.mkdir(parents=True, exist_ok=True)
cmd += ["-v", f"{host_output}:/project/output"]
If you’re on SELinux (Fedora/RHEL), add :z
to the bind mount to avoid permission/labeling issues:
-v "<host_path>:/project/output:z"
Given your symptoms, I’d recommend Option 1 (short polling) or Option 2 (sentinel file) for robustness. If your container’s exit code is reliable and you prefer simplicity, Option 3 is fine too.