Tinh
07/10/2025, 1:08 PMimageA
will read the config.json
and perform the task.
Could you give me example of this run in prefect flowMarvin
07/10/2025, 1:08 PMMarvin
07/10/2025, 1:09 PMpython
from prefect import flow, task
from typing import List
import docker
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def run_container(container_index: int) -> str:
"""Run a single container instance"""
client = docker.from_env()
# Mount the config.json into the container
volumes = {
'/path/to/your/config.json': {
'bind': '/app/config.json',
'mode': 'ro' # Read-only mode
}
}
# Run the container
container = client.containers.run(
image="imageA",
volumes=volumes,
detach=True,
environment={
"CONTAINER_INDEX": str(container_index)
# Add any other environment variables needed
}
)
# Wait for container to complete and get logs
result = container.wait()
logs = container.logs().decode()
container.remove()
return f"Container {container_index} completed with exit code {result['StatusCode']}"
@flow(name="Climate Service Parallel Processing")
def process_climate_data(num_containers: int = 51) -> List[str]:
"""
Flow to run multiple climate service containers in parallel
"""
# Create a list of container indices to process
container_indices = list(range(num_containers))
# Run containers in parallel using map
results = run_container.map(container_indices)
return results
if __name__ == "__main__":
process_climate_data()
This example includes several important features:
1. Parallel Execution: Using run_container.map()
, Prefect will automatically handle running the containers in parallel.
2. Container Management:
- Each container gets mounted with your config.json
- Containers are properly cleaned up after execution
- Logs are captured
- Container exit codes are monitored
3. Caching: The task includes caching with cache_key_fn=task_input_hash
to avoid rerunning containers unnecessarily.
To use this code:
1. Install the required dependencies:
bash
pip install prefect docker
2. Modify these parts of the code:
- Replace /path/to/your/config.json
with the actual path to your config file
- Replace imageA
with your actual Docker image name
- Add any additional environment variables your container needs
- Adjust the volume mounting configuration if needed
3. Run the flow:
python
process_climate_data(num_containers=51)
Some important notes:
1. Resource Management: Make sure your machine has enough resources to handle 51 containers running simultaneously. You might want to add concurrency limits if needed:
python
@flow(name="Climate Service Parallel Processing", task_runner=ConcurrentTaskRunner())
def process_climate_data(num_containers: int = 51) -> List[str]:
# Add concurrency limits if needed
results = run_container.map(container_indices, max_concurrency=10) # Run 10 at a time
2. Error Handling: The code includes basic error handling, but you might want to add more sophisticated error handling based on your specific needs.
3. Monitoring: You can monitor the progress through the Prefect UI, which will show you the status of each container task.
4. Container Cleanup: The code automatically removes containers after they complete. Make sure this is the behavior you want.
If you need to deploy this flow, you can use Prefect's deployment features:
python
from prefect.deployments import Deployment
deployment = Deployment.build_from_flow(
flow=process_climate_data,
name="climate-service-deployment",
version="1",
work_queue_name="default"
)
deployment.apply()
Marvin
07/10/2025, 1:09 PMTinh
07/10/2025, 1:31 PMens_id
inside the config.json
file. e.g
{
"ens_id": "00
}
How I can guide this 51 container run from 00-50 in this case?
Question to volumes: normally I need only to mount:
'/my/local/path': '/docker/path'
When do we need define bind, mode
Marvin
07/10/2025, 1:31 PMMarvin
07/10/2025, 1:31 PMpython
from prefect import flow, task
from typing import List
import docker
from prefect.tasks import task_input_hash
from datetime import timedelta
import json
import os
@task
def create_config(ens_id: str, config_dir: str) -> str:
"""Create a config file for each ensemble member"""
# Ensure the directory exists
os.makedirs(config_dir, exist_ok=True)
config = {
"ens_id": f"{int(ens_id):02d}" # This formats numbers as 00, 01, ..., 50
}
# Create unique config file for this ensemble
config_path = os.path.join(config_dir, f"config_{ens_id}.json")
with open(config_path, "w") as f:
json.dump(config, f, indent=2)
return config_path
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def run_container(ens_id: int) -> str:
"""Run a single container instance for a specific ensemble ID"""
client = docker.from_env()
# Create config file for this ensemble
config_path = create_config(ens_id, "/tmp/climate_configs")
# Simple volume mounting
volumes = {
config_path: '/docker/path/config.json' # Mount the config file to where your container expects it
}
# Run the container
container = client.containers.run(
image="imageA",
volumes=volumes,
detach=True,
environment={
"ENS_ID": f"{int(ens_id):02d}" # Pass ensemble ID as environment variable if needed
}
)
# Wait for container to complete and get logs
result = container.wait()
logs = container.logs().decode()
container.remove()
# Clean up the temporary config file
os.remove(config_path)
return f"Ensemble {ens_id:02d} completed with exit code {result['StatusCode']}"
@flow(name="Climate Service Ensemble Processing")
def process_climate_ensembles(num_ensembles: int = 51) -> List[str]:
"""
Flow to run multiple climate service containers in parallel
"""
# Create a list of ensemble IDs (0 to 50)
ensemble_ids = list(range(num_ensembles))
# Run containers in parallel using map
results = run_container.map(ensemble_ids)
return results
if __name__ == "__main__":
process_climate_ensembles()
Now, regarding your volume mounting question!
The simple format '/my/local/path': '/docker/path'
is actually a shorthand that Docker provides. When you use this format, Docker automatically assumes:
- bind
mount type (as opposed to volume
or tmpfs
)
- rw
mode (read-write access)
You need to use the more verbose format with bind
and mode
when you want to:
1. Explicitly control read/write access:
python
volumes = {
'/my/local/path': {
'bind': '/docker/path',
'mode': 'ro' # read-only
}
}
2. Specify different mount types:
python
volumes = {
'/my/local/path': {
'bind': '/docker/path',
'mode': 'rw' # read-write
},
'my_named_volume': {
'bind': '/docker/data',
'mode': 'rw'
}
}
For your use case, since you're just mounting a config file, the simple format is perfectly fine:
python
volumes = {
config_path: '/docker/path/config.json'
}
The updated flow above:
1. Creates a unique config file for each ensemble member (00-50)
2. Mounts it into the container
3. Cleans up the temporary config file after the container finishes
4. Uses the simpler volume mounting syntax
Would you like me to explain any part of this in more detail or help you adjust it further for your specific needs?Tinh
07/10/2025, 2:11 PMv.get('bind') for v in volumes.values()
^^^^^
AttributeError: 'str' object has no attribute 'get'
I got this from this command
container = client.containers.run(
image="my-image",
volumes=volumes,
detach=True,
environment={
"ENS_ID": f"{int(ens_id):02d}" # Pass ensemble ID as environment variable if needed
}
)
Marvin
07/10/2025, 2:11 PMMarvin
07/10/2025, 2:11 PMAttributeError: 'str' object has no attribute 'get'
happens because the Docker Python SDK expects volumes to be in a specific format when using the simple path:path
mapping. Here's how to fix it:
python
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def run_container(ens_id: int) -> str:
"""Run a single container instance for a specific ensemble ID"""
client = docker.from_env()
# Create config file for this ensemble
config_path = create_config(ens_id, "/tmp/climate_configs")
# Fix: Use this format for volumes
volumes = {
config_path: {
'bind': '/docker/path/config.json',
'mode': 'rw'
}
}
# Run the container
container = client.containers.run(
image="my-image",
volumes=volumes,
detach=True,
environment={
"ENS_ID": f"{int(ens_id):02d}"
}
)
# Rest of your code...
Or alternatively, if you want to use the simpler format, you need to use a different syntax:
python
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def run_container(ens_id: int) -> str:
client = docker.from_env()
config_path = create_config(ens_id, "/tmp/climate_configs")
# Fix: Use this format for simple volume mounting
volumes = f"{config_path}:/docker/path/config.json"
container = client.containers.run(
image="my-image",
volumes=[volumes], # Note: Pass as a list
detach=True,
environment={
"ENS_ID": f"{int(ens_id):02d}"
}
)
# Rest of your code...
The error occurred because the Docker SDK was expecting either:
1. The full dictionary format with 'bind' and 'mode'
2. A string in the format "source:target" passed as a list
Choose whichever format works better for your use case. The first format (with explicit bind and mode) gives you more control, while the second format is more concise.
Let me know if you need any clarification or run into other issues!Tinh
07/10/2025, 4:23 PMMarvin
07/10/2025, 4:23 PMMarvin
07/10/2025, 4:23 PMpython
from prefect import flow, task
from typing import List
import docker
from prefect.tasks import task_input_hash
from datetime import timedelta
import psutil # For getting system resources
@task
def calculate_container_resources(total_containers: int = 51) -> dict:
"""Calculate resources per container based on system availability"""
# Get system resources
total_cores = psutil.cpu_count(logical=False) # Physical cores
total_threads = psutil.cpu_count(logical=True) # Logical cores (threads)
total_memory = psutil.virtual_memory().total
# Convert memory to GB for easier handling
total_memory_gb = total_memory / (1024 * 1024 * 1024)
# Calculate resources per container
# Leave some headroom for the system (80% of total resources)
usable_cores = max(1, (total_cores * 0.8) / total_containers)
usable_memory = max(1, (total_memory_gb * 0.8) / total_containers)
return {
"cpu_count": round(usable_cores, 2),
"memory_limit": f"{round(usable_memory, 2)}g"
}
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def run_container(ens_id: int, resources: dict) -> str:
"""Run a single container instance with specified resources"""
client = docker.from_env()
config_path = create_config(ens_id, "/tmp/climate_configs")
volumes = {
config_path: {
'bind': '/docker/path/config.json',
'mode': 'rw'
}
}
# Create container with resource constraints
container = client.containers.run(
image="my-image",
volumes=volumes,
detach=True,
environment={
"ENS_ID": f"{int(ens_id):02d}"
},
# Resource constraints
cpu_count=resources["cpu_count"], # Number of CPU cores
mem_limit=resources["memory_limit"], # Memory limit
cpu_shares=1024, # Relative CPU share weight
cpuset_cpus=None, # Let Docker handle CPU assignment
)
result = container.wait()
logs = container.logs().decode()
container.remove()
return f"Ensemble {ens_id:02d} completed with exit code {result['StatusCode']}"
@flow(name="Climate Service Ensemble Processing")
def process_climate_ensembles(
num_ensembles: int = 51,
max_parallel: int = None
) -> List[str]:
"""
Flow to run multiple climate service containers with resource management
Args:
num_ensembles: Number of ensemble members to process
max_parallel: Maximum number of containers to run in parallel
(None means run all simultaneously)
"""
# Calculate resources per container
resources = calculate_container_resources(num_ensembles)
# Create a list of ensemble IDs (0 to 50)
ensemble_ids = list(range(num_ensembles))
# Run containers in parallel using map with resource constraints
results = run_container.map(
ensemble_ids,
[resources] * len(ensemble_ids),
max_concurrency=max_parallel
)
return results
if __name__ == "__main__":
# Example: Run with a concurrency limit of 10 containers at a time
process_climate_ensembles(max_parallel=10)
This solution provides several ways to manage resources:
1. Automatic Resource Calculation:
python
# Calculate based on system resources
resources = calculate_container_resources(num_ensembles)
2. Parallel Execution Control:
python
# Control how many containers run simultaneously
process_climate_ensembles(max_parallel=10) # Run 10 at a time
3. Fine-Tuned Resource Allocation:
You can also manually specify resources if you prefer:
```pythonMarvin
07/10/2025, 4:23 PMUsage examples:
1. *Automatic resource distribution*:
```python
# Let the system calculate optimal resources
process_climate_ensembles(max_parallel=10)
2. Manual resource allocation:
python
# Manually specify resources per container
process_climate_ensembles_manual_resources(
cores_per_container=2,
memory_per_container="4g",
max_parallel=5
)
Additional considerations:
1. CPU Pinning: If you need specific containers to use specific CPU cores:
python
# In run_container task
container = client.containers.run(
...,
cpuset_cpus="0-1" # Use CPU cores 0 and 1
)
2. Memory Swapping: Control swap usage:
python
container = client.containers.run(
...,
mem_swappiness=0, # Disable swapping
mem_reservation="1g" # Soft limit
)
3. Monitoring: You might want to add resource monitoring:
python
@task
def monitor_container_resources(container_id: str):
client = docker.from_env()
container = client.containers.get(container_id)
stats = container.stats(stream=False)
# Process and return resource usage statistics
return stats
Remember that the actual resource availability depends on your system's capabilities and other running processes. You might need to adjust the calculations based on your specific requirements and system constraints.
Would you like me to explain any part of this in more detail or help you adjust the resource allocation strategy for your specific needs?