Ridhwan
07/28/2025, 3:51 AMMarvin
07/28/2025, 3:51 AMMarvin
07/28/2025, 3:52 AMsubprocess module (recommended)
2. Using the community prefect-shell package
Here's a complete example using subprocess, which is the most straightforward approach:
python
from prefect import flow, task
import subprocess
from datetime import datetime
@task
def run_pg_dump(database_name: str, output_path: str) -> str:
"""Run pg_dump and return the output path"""
try:
# Construct the pg_dump command
command = [
"pg_dump",
"-d", database_name,
"-f", output_path,
"--format=custom"
]
# Run the command
result = subprocess.run(
command,
check=True, # This will raise an exception if the command fails
capture_output=True,
text=True
)
return output_path
except subprocess.CalledProcessError as e:
print(f"Error output: {e.stderr}")
raise Exception(f"pg_dump failed with exit code {e.returncode}")
@flow
def backup_database(database_name: str):
"""Flow to backup a PostgreSQL database"""
# Create a timestamp for the backup file
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = f"backup_{database_name}_{timestamp}.dump"
# Run the backup
result_path = run_pg_dump(database_name, output_path)
print(f"Database backup completed successfully: {result_path}")
return result_path
# Run the flow
if __name__ == "__main__":
backup_database("your_database_name")
Alternative approach using prefect-shell (requires installation):
python
# First install: pip install prefect-shell
from prefect import flow
from prefect_shell import ShellOperation
@flow
def backup_database(database_name: str):
shell = ShellOperation()
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = f"backup_{database_name}_{timestamp}.dump"
command = f"pg_dump -d {database_name} -f {output_path} --format=custom"
result = shell.run(command=command)
if result.return_code == 0:
print(f"Database backup completed successfully: {output_path}")
return output_path
else:
raise Exception(f"Backup failed: {result.stderr}")
Key points about running shell commands in Prefect:
1. Error Handling: Always include proper error handling as shown in the examples. Shell commands can fail for various reasons.
2. Security Considerations:
- Be careful with string formatting in commands to avoid shell injection
- Use subprocess.run() with a list of arguments rather than shell=True when possible
- Consider using environment variables for sensitive information
3. Logging: Prefect will automatically capture the logs from your tasks and flows, but you might want to explicitly log the command output for debugging purposes.
4. Environment: Make sure the shell command (like pg_dump) is available in the environment where your flow will run. This is especially important when deploying flows.
You can extend these examples to run any shell command. Here's a more generic template:
python
@task
def run_shell_command(command: list[str]) -> str:
"""Generic task to run any shell command"""
try:
result = subprocess.run(
command,
check=True,
capture_output=True,
text=True
)
return result.stdout
except subprocess.CalledProcessError as e:
print(f"Error output: {e.stderr}")
raise Exception(f"Command failed with exit code {e.returncode}")
@flow
def run_command_flow(command: list[str]):
"""Flow to run any shell command"""
result = run_shell_command(command)
print(f"Command output: {result}")
return result
This can be used for any command:
```python
# Example usageMarvin
07/28/2025, 3:52 AMRidhwan
07/28/2025, 6:20 AMMarvin
07/28/2025, 6:20 AMMarvin
07/28/2025, 6:21 AMMarvin
07/28/2025, 6:21 AMThis implementation includes several important features:
1. *Proper Sequencing*:
- The flow ensures that pg_dump completes before pg_restore starts
- Each step is a separate task for better observability
- Tasks wait for completion before proceeding
2. *Verification Steps*:
- Verifies the dump file exists and isn't empty
- Checks the restored database size
- Includes error handling at each step
3. *Flexibility*:
- Supports optional authentication parameters
- Configurable host and port
- Customizable backup directory
4. *Error Handling*:
- Comprehensive error catching and reporting
- Proper subprocess management
- Verification of each step's success
To use this in production, you might want to add:
```python
from prefect.blocks.system import Secret
@flow
def secure_backup_and_restore(
source_database: str,
target_database: str
):
# Load credentials securely
postgres_password = Secret.load("postgres-password")
# Set password in environment
import os
os.environ["PGPASSWORD"] = postgres_password.get()
return backup_and_restore_database(
source_database=source_database,
target_database=target_database,
username="postgres"
)
Key considerations:
1. Environment Variables: PostgreSQL commands typically use environment variables like PGPASSWORD for authentication. In production, use Prefect Blocks for secure credential management.
2. Error Handling: The implementation includes comprehensive error handling, but you might want to add specific handling for your use case.Marvin
07/28/2025, 6:21 AMpython
from prefect import task
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(
retries=3,
retry_delay_seconds=30,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1)
)
def run_pg_dump(...):
# ... same implementation as above
This implementation ensures that your database operations are reliable, observable, and maintainable within your Prefect workflows.