This message was deleted.
# marvin-ai
s
This message was deleted.
m
thought for 1771.7 seconds
It looks like you want to run the same containerized job for 51 ensemble members in parallel. A few key points and a clean pattern for Prefect 3.x: 1) Define the task to accept a single ens_id Right now your task references the entire
ens_ids
list. Instead, parameterize the task so each run launches one container for one ID. 2) Map or submit the task over your IDs - Use
.map(...)
if you like the functional style. - Or use
.submit(...)
in a loop for more control. 3) Control how many run concurrently If you run this on one machine, you usually don’t want to start all 51 at once. Use
ThreadPoolTaskRunner(max_workers=...)
to cap local concurrency. 4) Avoid input/output races If every container reads
./input/config.json
and writes
./output
, give each run its own mount directory so they don’t clobber each other. Here’s a complete example that: - creates a per-ensemble mount directory, - writes a per-ensemble config.json, - launches your container runner, - and caps concurrency to 8 at a time.
Copy code
from prefect import flow, task, unmapped
from prefect.task_runners import ThreadPoolTaskRunner
from pathlib import Path
import json

# Your container runner
# def run_con(image_name: str, local_path: str, ens_id: str, ens: bool): ...

@task(retries=1)  # optional retries
def run_mod(ens_id: str, base_mount: str):
    # Create per-ens mount dir
    mount = Path(base_mount) / f"ens_{ens_id}"
    (mount / "input").mkdir(parents=True, exist_ok=True)
    (mount / "output").mkdir(parents=True, exist_ok=True)

    # Write per-ens config
    cfg = {"ensemble_member": ens_id}
    (mount / "input" / "config.json").write_text(json.dumps(cfg))

    # Launch one container for this ens_id
    return run_con(
        image_name="run_modA:fc:2.1",
        local_path=str(mount),
        ens_id=ens_id,
        ens=False,
    )

@flow(name="WaterGAP: Model Computation",
      log_prints=True,
      task_runner=ThreadPoolTaskRunner(max_workers=8))
def run_watergap(base_mount: str):
    ens_ids = [f"{i:02d}" for i in range(51)]

    # Option A: mapping
    futures = run_mod.map(ens_ids, base_mount=unmapped(base_mount))

    # Option B: submit in a loop
    # futures = [run_mod.submit(e, base_mount) for e in ens_ids]

    # Wait for completion
    return [f.result() for f in futures]

# Example local call
# run_watergap("/data/watergap")
Notes and caveats - Concurrency: The example caps to 8 concurrent containers. If you truly want 51 at once on the same host, set
max_workers=51
(be careful with CPU, RAM, disk, and Docker daemon load). - Defaults: If you omit
max_workers
, Prefect defers to the underlying thread pool’s default. It’s generally not tuned for launching lots of heavy external processes. - Global concurrency limits: If you use Prefect Cloud/Server and want to coordinate concurrency across multiple runs or machines, you can add a global limit and gate the task with the concurrency context manager. See Global concurrency limits. - Task runners: More about task runners in Prefect 3.x here: Task runners. Questions to make sure this fits: - Do you want all 51 containers to start at once, or should we cap them to avoid overwhelming the host? - Are you running locally or via a deployment with a work pool (Docker/Kubernetes/etc.)? - Is it okay to use separate mount directories per ensemble member as shown?
t
@Marvin sorry, your set up seems to be overcomplicated for my case. Here is how I simplify this
Copy code
def run_con(image_name: str, local_path: str, ens_id: str, ens: bool = False):
    cmd = ["docker", "run"]
    cmd += ["-v", f"{local_path}:/outlast"]
    if ens:
        cmd += [
            "-e", f"ENS_ID={ens_id}",
            "-v", f"{local_path}/Input/InputWaterGAP:/outlast/Input/InputWaterGAP:ro", #:ro to avoid read conflicts among container
        ]

    cmd += [image_name]
    result = subprocess.run(cmd, capture_output=True,text=True)

    if result.returncode != 0:
        raise Exception(f"Docker run failed: {result.stderr}")
    
    print(result.stdout)
    return result.stdout
and below is my
task
and
flow
Copy code
@task
def run_fc(ens_id: str):
    return run_con(image_name= "modA:2.1.2",
                   local_path=mount_dir,
                   ens_id=ens_id,
                   ens=True)


@flow
def run_mod():
    ens_ids = [str(i).zfill(2) for i in range(51)]
    futures = run_fc.map(ens_ids)
    results = [future.result() for future in futures]    
    return results
This works perfectly for prefect 3.2.6 However, I dont know why some container exits directly after start (return no results) and with status "completed" How can I check and handle this?
m
thought for 55.4 seconds
Got it — thanks for the clarification. What you’re seeing is expected Prefect behavior: if your task function returns normally (even if the container exits immediately and produces no output), Prefect marks the task as “Completed”. To detect and handle “no-op” containers, add simple post-run validation and raise an exception when expectations aren’t met. Minimal changes you can make: 1) Log more context and capture both stdout and stderr 2) Validate outputs after the container finishes 3) Raise on validation failure (Prefect will mark the task Failed and you can add retries) 4) Optionally cap concurrency to reduce flakiness from resource contention Here’s a compact version with those changes:
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect import get_run_logger
import subprocess, os

def run_con(image_name: str, local_path: str, ens_id: str, ens: bool = False, timeout: int = 0):
    # Make the container name unique (handy for debugging)
    container_name = f"modA_{ens_id}"

    cmd = ["docker", "run", "--rm", "--name", container_name]
    cmd += ["-v", f"{local_path}:/outlast"]

    if ens:
        cmd += [
            "-e", f"ENS_ID={ens_id}",
            "-v", f"{local_path}/Input/InputWaterGAP:/outlast/Input/InputWaterGAP:ro",
        ]

    cmd += [image_name]

    # Optional timeout to avoid hangs
    run_kwargs = dict(capture_output=True, text=True)
    if timeout and timeout > 0:
        run_kwargs["timeout"] = timeout

    result = subprocess.run(cmd, **run_kwargs)

    if result.returncode != 0:
        raise RuntimeError(f"Docker run failed (ens_id={ens_id}): {result.stderr}")

    return result.stdout, result.stderr


@task(retries=2, retry_delay_seconds=30)
def run_fc(ens_id: str, base_dir: str):
    logger = get_run_logger()

    # Run one container
    stdout, stderr = run_con(
        image_name="modA:2.1.2",
        local_path=base_dir,
        ens_id=ens_id,
        ens=True,
        timeout=0,  # set e.g. 3600 to enforce a 1hr timeout
    )

    # Log output for observability
    if stdout:
        <http://logger.info|logger.info>(f"[{ens_id}] STDOUT:\n{stdout}")
    if stderr:
        <http://logger.info|logger.info>(f"[{ens_id}] STDERR:\n{stderr}")

    # Post-run validation: check that expected output exists
    # Adjust these checks to your actual outputs
    out_dir = os.path.join(base_dir, "Output", ens_id)  # or shared dir if that’s your design
    # If you’re writing to a shared folder, check for expected files by pattern instead
    produced_anything = os.path.exists(out_dir) and any(os.scandir(out_dir))

    if not produced_anything:
        # Treat as a failure so Prefect marks task as failed and retries if configured
        raise RuntimeError(f"[{ens_id}] Container exited but no outputs were produced at {out_dir}")

    return {"ens_id": ens_id, "output_dir": out_dir}


@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))  # tune concurrency as needed
def run_mod(base_dir: str):
    ens_ids = [f"{i:02d}" for i in range(51)]
    futures = run_fc.map(ens_ids, base_dir=base_dir)
    results = [f.result() for f in futures]
    return results
Why this helps - If a container exits quickly with code 0 but doesn’t produce the expected files, the task now raises, marking the task as Failed and (optionally) retrying. - You’ll see stdout/stderr in Prefect logs for each
ens_id
to understand why it exited quickly (e.g., already-done checks, missing input, permissions, etc.). -
max_workers=8
reduces resource contention; adjust to your host capacity. Optional improvements - Prevent output collisions: if multiple containers write the same filenames to a shared
Output
directory, some may exit early if they detect “work already done”. If that’s not desired, mount per-ensemble subdirectories: - Change the docker volume to
-v {base_dir}/ens_{ens_id}:/outlast
and write outputs under that.
- Global concurrency limits (if using Prefect Cloud/Server) can gate runs across machines: Global concurrency limits. - Task runner docs for concurrency details: Task runners. If you can share: - The exact path(s) your container writes to for each ensemble - The name(s) of expected output files …I can tailor the post-run validation snippet precisely to your layout.
t
@Marvin Thanks! Although I have written :ro in the input folder, but seems lots of container fails to access the input as below
Copy code
OSError: [Errno 9] Bad file descriptor: '/project/input/static_inputdata_22e_elevrange.nc4
m
thought for 332.0 seconds
Thanks for the details — that error message is the key. Your container is trying to read from /project/input/static_inputdata_22e_elevrange.nc4, but you’re mounting your inputs at /outlast/Input/InputWaterGAP:ro. Inside the container, there’s no data at /project/input, so the program’s file open fails. Depending on the library, that can surface as “Bad file descriptor”. Fix: mount the host input directory to the exact path your app expects in the container (i.e., /project/input), and keep the output mount separate. Updated run_con with correct mounts and better diagnostics
Copy code
import subprocess
from pathlib import Path

def run_con(image_name: str, local_path: str, ens_id: str, ens: bool = False, timeout: int = 0):
    # Host paths
    host_input = Path(local_path) / "Input" / "InputWaterGAP"
    host_output = Path(local_path) / "Output" / ens_id
    host_output.mkdir(parents=True, exist_ok=True)

    # Sanity check: input exists on host before we run the container
    if not host_input.exists():
        raise FileNotFoundError(f"Host input directory not found: {host_input}")

    cmd = ["docker", "run", "--rm", "--name", f"modA_{ens_id}"]

    # Mount inputs to the path your app expects INSIDE the container
    # Add :z on SELinux systems (Fedora/RHEL) -> f"...:/project/input:ro,z"
    cmd += ["-v", f"{host_input}:/project/input:ro"]

    # Mount per-ens output to avoid collisions
    cmd += ["-v", f"{host_output}:/project/output"]

    if ens:
        cmd += ["-e", f"ENS_ID={ens_id}"]

    cmd += [image_name]

    run_kwargs = dict(capture_output=True, text=True)
    if timeout and timeout > 0:
        run_kwargs["timeout"] = timeout

    result = subprocess.run(cmd, **run_kwargs)

    if result.returncode != 0:
        raise RuntimeError(f"Docker run failed (ens_id={ens_id}): {result.stderr}")

    return result.stdout
Quick preflight to verify the mount is correct Run this once to confirm the container sees the file at the expected path:
Copy code
docker run --rm \
  -v "<LOCAL_PATH>/Input/InputWaterGAP:/project/input:ro" \
  <your_image> \
  sh -lc "ls -l /project/input && test -f /project/input/static_inputdata_22e_elevrange.nc4 && echo OK"
If that prints OK, the mount path is correct. If not, either the file doesn’t exist at the host path, casing differs (Linux is case-sensitive), or the container is looking in a different directory. Extra tips - SELinux: On Fedora/RHEL with SELinux enforcing, add :z to the bind mount so the container is allowed to read: …/project/inputro,z - Working directory assumptions: If your app uses relative paths, set the working dir with -w /project or adjust your app to use absolute paths. - Permissions: If the files are owned by a user that the container process can’t read, run the container as a compatible user (e.g., add --user $(id -u):$(id -g) and ensure permissions are at least 0444 on inputs). - Concurrency: Reading the same inputs from 51 containers is fine with :ro, but make sure outputs are per-ensemble to prevent conflicts (as shown above). In your Prefect task, keep logging stdout/stderr and raise on validation failure (so Prefect marks the task as failed and retries if configured). If you still see EBADF after fixing the mount target, please share: - The exact container command your app runs to open the file - OS (Linux/macOS/Windows), and whether you’re on SELinux - The precise input file path on the host We can refine from there.
t
@Marvin how can I set the multiple container runs at the same time but with a delay of 30s, would that then avoid the reading conflict?
m
thought for 170.8 seconds
Short answer: yes, you can stagger starts by 30s. Easiest is to submit tasks sequentially with a sleep between submissions so they still run concurrently but start offset from each other. Option A: sleep between submits in the flow
Copy code
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
import time

@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def run_mod(base_dir: str):
    ens_ids = [f"{i:02d}" for i in range(51)]
    futures = []
    for i, ens_id in enumerate(ens_ids):
        futures.append(run_fc.submit(ens_id=ens_id, base_dir=base_dir))
        if i < len(ens_ids) - 1:
            time.sleep(30)  # stagger next start by 30s
    return [f.result() for f in futures]
Option B: add a per-task delay parameter and sleep inside the task
Copy code
from prefect import task

@task
def run_fc(ens_id: str, base_dir: str, start_delay_s: int = 0):
    if start_delay_s:
        import time; time.sleep(start_delay_s)
    return run_con(image_name="modA:2.1.2", local_path=base_dir, ens_id=ens_id, ens=True)

@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def run_mod(base_dir: str):
    ens_ids = [f"{i:02d}" for i in range(51)]
    futures = [run_fc.submit(e, base_dir, start_delay_s=i*30) for i, e in enumerate(ens_ids)]
    return [f.result() for f in futures]
Will this avoid your read conflict? - It can reduce pressure on the filesystem and may “mask” the issue, but the robust fix is to ensure: - inputs are mounted read-only at the exact in-container path your app uses, and - outputs/temp paths are unique per run. - If your inputs are on a network filesystem and you’re reading NetCDF/HDF5 in parallel, consider disabling HDF5 file locking inside the container: - add
-e HDF5_USE_FILE_LOCKING=FALSE
to
docker run
- Keep concurrency reasonable with
ThreadPoolTaskRunner(max_workers=...)
to avoid overwhelming the host. Refs: - Task runners: Prefect 3 task runners - Global concurrency/rate controls (if using Prefect Cloud/Server): Global concurrency limits
t
@Marvin The delay works nicely. By the way, adding `-e HDF5_USE_FILE_LOCKING=FALSE`to
docker run
does not help in my case
Copy code
# Post-run validation: check that expected output exists
    # Adjust these checks to your actual outputs
   out_dir = os.path.join(base_dir, "Output", "202508", ens_id)  # or shared dir if that's your design
    # If you're writing to a shared folder, check for expected files by pattern instead
    produced_anything = os.path.exists(out_dir) and any(os.scandir(out_dir))

    if not produced_anything:
        #Treat as a failure so Prefect marks task as failed and retries if configured
       raise RuntimeError(f"[{ens_id}] Container exited but no outputs were produced at {out_dir}")
This block sounds working wrongly. Even my containers return the output to the
out_dir
, but the
post_run validation
seems not yet find anything shortly after container exits, thus it report error and keep
retrying
... How could I improve this or just ignore this block?
m
thought for 40.5 seconds
You’re right — immediate post-run checks can race with the filesystem (bind mounts, NFS, overlayfs) and “miss” files that appear a moment later. You have a few options: Option 1: Add a short polling window before failing Poll the output directory for a few seconds after the container exits. This avoids false negatives without relying on Prefect retries.
Copy code
import os, time
from pathlib import Path

def wait_for_outputs(path: str | Path, min_files: int = 1, timeout_s: int = 20, poll_s: float = 1.0) -> bool:
    path = Path(path)
    deadline = time.time() + timeout_s
    while time.time() < deadline:
        if path.is_dir():
            try:
                files = [e for e in os.scandir(path) if e.is_file()]
            except FileNotFoundError:
                files = []
            if len(files) >= min_files:
                return True
        time.sleep(poll_s)
    return False

# In your task after docker run:
ok = wait_for_outputs(out_dir, min_files=1, timeout_s=20, poll_s=1)
if not ok:
    raise RuntimeError(f"[{ens_id}] Container exited but no outputs were produced at {out_dir}")
Option 2: Use a sentinel “done” file Have the container write an atomic marker like
/project/output/_SUCCESS
only after all outputs are flushed. Then your task just checks for that specific file:
Copy code
if not Path(out_dir, "_SUCCESS").exists():
    raise RuntimeError(f"[{ens_id}] Missing _SUCCESS in {out_dir}")
Option 3: Trust the exit code and downgrade the check to a warning If your container already validates its work and exits non-zero on failure, you can skip strict validation in Prefect (or just log a warning if the directory looks empty):
Copy code
if not produced_anything:
    logger.warning(f"[{ens_id}] No files seen yet in {out_dir}; container exited 0. Proceeding.")
Also double-check the path you’re validating Make sure the host path you’re checking matches the path you mounted for output. If you want
.../Output/202508/<ens_id>
, mount that exact path into the container:
Copy code
host_output = Path(base_dir) / "Output" / "202508" / ens_id
host_output.mkdir(parents=True, exist_ok=True)
cmd += ["-v", f"{host_output}:/project/output"]
If you’re on SELinux (Fedora/RHEL), add
:z
to the bind mount to avoid permission/labeling issues:
Copy code
-v "<host_path>:/project/output:z"
Given your symptoms, I’d recommend Option 1 (short polling) or Option 2 (sentinel file) for robustness. If your container’s exit code is reliable and you prefer simplicity, Option 3 is fine too.