Tinh
08/05/2025, 11:37 AMHPC
with the queuing framework?
If so, could you pls give me some hands-on example how can it be done?
Example context would be: in the deploy.py
file will run 5 flows A, B, C, D and E. flow A
runs on the VM. Flow B then too heavy and will be submit to HPC. When the Flows B done, flow C continue on VM, and flow D will be again on HPC. Flow E will finish on the VM. So flows B and D will be submitted to HPC.Marvin
08/05/2025, 11:37 AMMarvin
08/05/2025, 11:38 AMpython
# deploy.py
from prefect import flow
from prefect.deployments import Deployment
# Flow definitions
@flow(name="flow_a")
def flow_a():
print("Running Flow A on VM")
return "A done"
@flow(name="flow_b")
def flow_b():
print("Running Flow B on HPC")
return "B done"
@flow(name="flow_c")
def flow_c():
print("Running Flow C on VM")
return "C done"
@flow(name="flow_d")
def flow_d():
print("Running Flow D on HPC")
return "D done"
@flow(name="flow_e")
def flow_e():
print("Running Flow E on VM")
return "E done"
# Parent flow that orchestrates the execution
@flow(name="main_orchestrator")
def orchestrator():
# Flow A on VM
flow_a()
# Flow B on HPC
flow_b()
# Flow C back on VM
flow_c()
# Flow D on HPC
flow_d()
# Flow E on VM
flow_e()
# Create deployments
if __name__ == "__main__":
# Deploy VM flows to process pool
vm_deployment = Deployment.build_from_flow(
flow=orchestrator,
name="vm_flows",
work_pool_name="vm-pool",
)
vm_deployment.apply()
# Deploy HPC flows to HPC pool
hpc_deployment = Deployment.build_from_flow(
flow=orchestrator,
name="hpc_flows",
work_pool_name="hpc-pool",
)
hpc_deployment.apply()
To set this up:
1. First, create the work pools:
bash
# Create a process work pool for VM execution
prefect work-pool create "vm-pool" --type process
# Create a work pool for HPC execution
# The exact type and configuration will depend on your HPC setup
prefect work-pool create "hpc-pool" --type process # You'll need to customize this based on your HPC infrastructure
2. Start workers for both pools:
bash
# Start VM worker
prefect worker start -p "vm-pool"
# Start HPC worker
prefect worker start -p "hpc-pool"
3. Deploy your flows:
bash
python deploy.py
Some important notes about this implementation:
1. The exact configuration for the HPC work pool will depend on your specific HPC setup. You might need to create a custom infrastructure type that knows how to submit jobs to your HPC system.
2. You'll need to ensure that your HPC worker has access to the necessary job submission commands (like sbatch
for SLURM, qsub
for PBS, etc.).
3. For more complex HPC integration, you might want to create a custom infrastructure block that handles the specifics of your HPC system. This would involve:
- Creating a custom infrastructure class
- Implementing the necessary methods to submit jobs to your HPC system
- Handling job status checking and results retrieval
4. The flows that run on HPC should be designed to handle the HPC environment (e.g., using the appropriate paths, having access to required libraries, etc.).
Would you like me to provide more specific details about any part of this setup, particularly about HPC integration with your specific system? Also, what HPC job scheduler are you using (SLURM, PBS, etc.)? This would help me provide more targeted guidance for your setup.Tinh
08/05/2025, 11:42 AMdeploy.py
, Dockerfile
and docker-compose.yml
Marvin
08/05/2025, 11:42 AMMarvin
08/05/2025, 11:44 AMproject/
├── deploy.py
├── Dockerfile
├── docker-compose.yml
├── requirements.txt
└── flows/
├── __init__.py
└── hpc_flows.py
2. Let's start with `requirements.txt`:
text
prefect>=3.0.0
dask-jobqueue>=0.8.0 # For HPC integration
paramiko>=3.0.0 # For SSH connections to HPC
3. Here's the `Dockerfile`:
dockerfile
# Use an official Python runtime as a parent image
FROM prefecthq/prefect:2-python3.9
# Set environment variables
ENV PYTHONUNBUFFERED=1
# Set working directory
WORKDIR /opt/prefect
# Install system dependencies (including SSH client for HPC connection)
RUN apt-get update && apt-get install -y \
openssh-client \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements and install Python packages
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy your Prefect flow code
COPY flows/ /opt/prefect/flows/
COPY deploy.py /opt/prefect/
# Create directory for SSH keys
RUN mkdir -p /root/.ssh && chmod 700 /root/.ssh
# Command to run Prefect worker
CMD ["prefect", "worker", "start", "-p", "default-pool"]
4. Here's the `docker-compose.yml`:
yaml
version: '3.8'
services:
prefect-server:
image: prefecthq/prefect:2-python3.9
command: prefect server start
ports:
- "4200:4200"
volumes:
- prefect_data:/root/.prefect
networks:
- prefect-network
prefect-worker:
build: .
environment:
- PREFECT_API_URL=<http://prefect-server:4200/api>
- HPC_HOST=${HPC_HOST}
- HPC_USER=${HPC_USER}
- HPC_SSH_KEY_PATH=/root/.ssh/id_rsa
volumes:
- ~/.ssh/id_rsa:/root/.ssh/id_rsa:ro # Mount SSH key for HPC access
- ./flows:/opt/prefect/flows
depends_on:
- prefect-server
networks:
- prefect-network
networks:
prefect-network:
driver: bridge
volumes:
prefect_data:
5. Here's the `flows/hpc_flows.py`:
```python
from prefect import flow, task
from prefect.infrastructure import Process
import subprocess
import paramiko
import os
class HPCSubmitter:
def __init__(self):
self.ssh = paramiko.SSHClient()
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.ssh.connect(
hostname=os.environ["HPC_HOST"],
username=os.environ["HPC_USER"],
key_filename=os.environ["HPC_SSH_KEY_PATH"]
)
def submit_job(self, script_content):
# Create temporary SLURM script
stdin, stdout, stderr = self.ssh.exec_command(
'echo "%s" > job.slurm && sbatch job.slurm' % script_content
)
job_id = stdout.read().decode().strip()
return job_id
def check_job_status(self, job_id):
stdin, stdout, stderr = self.ssh.exec_command(f'squeue -j {job_id}')
return stdout.read().decode().strip()
def __del__(self):
self.ssh.close()
@task
def submit_to_hpc(script_content):
hpc = HPCSubmitter()
job_id = hpc.submit_job(script_content)
return job_id
@task
def run_on_vm(command):
result = subprocess.run(command, shell=True, capture_output=True, text=True)
return result.stdout
@flow(name="flow_a")
def flow_a():
return run_on_vm("echo 'Running Flow A on VM'")
@flow(name="flow_b")
def flow_b():
script = """#!/bin/bash
#SBATCH --job-name=flow_b
#SBATCH --output=flow_b_%j.out
#SBATCH --error=flow_b_%j.err
#SBATCH --time=010000
#SBATCH --nodes=1
#SBATCH --ntasks=1
echo "Running Flow B on HPC"
python /path/to/your/hpc/script.py
"""
return submit_to_hpc(script)
@flow(name="flow_c")
def flow_c():
return run_on_vm("echo 'Running Flow C on VM'")
@flow(name="flow_d")
def flow_d():
script = """#!/bin/bash
#SBATCH --job-name=flow_dMarvin
08/05/2025, 11:44 AM6. Finally, here's the `deploy.py`:
```python
from prefect.deployments import Deployment
from prefect.infrastructure import Process, Docker
from flows.hpc_flows import flow_a, flow_b, flow_c, flow_d, flow_e
# Create infrastructure blocks
process_infra = Process(
env={"PREFECT_LOGGING_LEVEL": "DEBUG"}
)
docker_infra = Docker(
image="your-registry/prefect-hpc:latest",
env={"PREFECT_LOGGING_LEVEL": "DEBUG"}
)
# Deploy VM flows
for flow in [flow_a, flow_c, flow_e]:
Deployment(
name=f"{flow.name}_deployment",
flow=flow,
infrastructure=process_infra,
work_pool_name="vm-pool"
).apply()
# Deploy HPC flows
for flow in [flow_b, flow_d]:
Deployment(
name=f"{flow.name}_deployment",
flow=flow,
infrastructure=docker_infra,
work_pool_name="hpc-pool"
).apply()
if __name__ == "__main__":
print("Deployments created successfully!")
To set up and run this system:
1. Create a .env
file with your HPC credentials:
bash
HPC_HOST=your.hpc.cluster
HPC_USER=your_username
2. Create the necessary work pools:
bash
prefect work-pool create "vm-pool" --type process
prefect work-pool create "hpc-pool" --type docker
3. Build and start the services:
bash
docker-compose build
docker-compose up -d
4. Deploy the flows:
bash
python deploy.py
Key features of this setup:
1. Security: Uses SSH key authentication for HPC access
2. Isolation: Each service runs in its own container
3. Flexibility: Can run flows both on VM and HPC
4. Monitoring: Can track job status on both environments
5. Scalability: Easy to add more flows or modify existing ones
Some important notes:
1. You'll need to modify the SLURM scripts in hpc_flows.py
to match your HPC environment's specific requirements.
2. The SSH key mounting assumes you have a key at ~/.ssh/id_rsa
. Adjust if your key is in a different location.
3. You might need to adjust the Python paths in the SLURM scripts depending on your HPC environment.
4. Consider adding error handling and retry logic for HPC job submission.
5. You might want to add logging to track job progress and status.
Would you like me to explain any part of this setup in more detail? Or would you like to know how to modify it for a different HPC scheduler system?Tinh
08/05/2025, 12:03 PMMarvin
08/05/2025, 12:03 PMMarvin
08/05/2025, 12:04 PMprocess
work pool type because:
- It allows direct execution of commands on the system where the worker is running
- It can directly interact with HPC submission commands (like sbatch
, qsub
, etc.)
- It doesn't add unnecessary containerization overhead
2. Docker Would Add Complications:
- Would require Docker to be installed on the HPC nodes (which is often not the case)
- Adds an unnecessary layer between the Prefect worker and the HPC system
- Could complicate access to HPC-specific commands and environments
Here's the corrected version of the work pool creation and relevant parts of the setup:
bash
# Create work pools
prefect work-pool create "vm-pool" --type process
prefect work-pool create "hpc-pool" --type process # Changed from docker to process
And the corrected `deploy.py`:
python
from prefect.deployments import Deployment
from prefect.infrastructure import Process
from flows.hpc_flows import flow_a, flow_b, flow_c, flow_d, flow_e
# Create infrastructure block for both VM and HPC
process_infra = Process(
env={
"PREFECT_LOGGING_LEVEL": "DEBUG",
"HPC_HOST": "${HPC_HOST}",
"HPC_USER": "${HPC_USER}",
"HPC_SSH_KEY_PATH": "/root/.ssh/id_rsa"
}
)
# Deploy VM flows
for flow in [flow_a, flow_c, flow_e]:
Deployment(
name=f"{flow.name}_deployment",
flow=flow,
infrastructure=process_infra,
work_pool_name="vm-pool"
).apply()
# Deploy HPC flows
for flow in [flow_b, flow_d]:
Deployment(
name=f"{flow.name}_deployment",
flow=flow,
infrastructure=process_infra, # Using the same process infrastructure
work_pool_name="hpc-pool"
).apply()
if __name__ == "__main__":
print("Deployments created successfully!")
The docker-compose.yml
can still be used for running the Prefect server and worker, but the worker will now use process-based execution instead of trying to spawn Docker containers. This means:
1. The worker will directly execute the HPC submission commands
2. It can directly access the SSH client and HPC commands
3. It maintains a simpler and more direct connection to the HPC system
This approach is more aligned with typical HPC workflows where:
- Jobs are submitted directly to the HPC scheduler
- The environment is controlled by the HPC system's module system
- Direct access to the HPC submission commands is needed
Would you like me to provide more details about how to configure the process-based worker for your specific HPC environment or explain any other aspects of this setup?