ASHIK
08/06/2025, 11:44 PMMarvin
08/06/2025, 11:44 PMMarvin
08/06/2025, 11:45 PMpython
from prefect import flow
from contextlib import contextmanager
import signal
@contextmanager
def handle_shutdown():
def handler(signum, frame):
# Perform cleanup
print("Performing cleanup...")
raise SystemExit("Received shutdown signal")
# Register SIGTERM handler
original_handler = signal.signal(signal.SIGTERM, handler)
try:
yield
finally:
# Restore original handler
signal.signal(signal.SIGTERM, original_handler)
@flow
def my_flow():
with handle_shutdown():
# Your flow logic here
...
3. Use Timeouts and Circuit Breakers
- Set appropriate timeouts for tasks that might hang
- Implement circuit breakers for external service calls
python
from prefect import flow, task
@task(timeout_seconds=300) # 5 minute timeout
def my_task():
# Task logic here
pass
4. Best Practices for Infrastructure
- Set appropriate resource limits in container configurations
- Configure proper memory limits in your deployment configurations
- Use health checks to monitor task and flow health
5. Monitor and Log
- Implement proper logging to track resource usage
- Use Prefect's built-in monitoring capabilities
- Set up alerts for resource-intensive operations
Important Notes:
- SIGKILL (-9) cannot be caught or handled directly - it's a forced termination
- Focus on preventing conditions that lead to SIGKILL rather than trying to handle it
- Use Prefect's built-in timeout mechanisms rather than implementing your own
- Consider using the on_crashed
hook in your flows to handle cleanup when possible
If you're running into specific SIGKILL scenarios, it would be helpful to know:
1. Are you running in containers?
2. What type of workload is causing the SIGKILL?
3. Are you seeing memory issues or other resource constraints?
This would help me provide more targeted recommendations for your specific use case.ASHIK
08/06/2025, 11:47 PMMarvin
08/06/2025, 11:47 PMMarvin
08/06/2025, 11:48 PMpython
from prefect import flow
@flow(task_runner_buffer_size=5) # Limit concurrent tasks
def my_flow():
# Your flow logic here
pass
b. Add Memory Monitoring
python
from prefect import task
import psutil
@task
def memory_intensive_task():
if psutil.virtual_memory().percent > 80:
raise RuntimeError("Memory usage too high, stopping task")
# Your task logic here
2. Infrastructure Adjustments
- If using containers, increase memory limits:
yaml
# docker-compose.yml example
services:
prefect-flow:
mem_limit: 4g
mem_reservation: 2g
- If using Kubernetes, set resource requests/limits:
yaml
resources:
requests:
memory: "2Gi"
limits:
memory: "4Gi"
3. Code Optimization Strategies
a. Batch Processing
python
from prefect import flow, task
@task
def process_batch(batch):
return [item.process() for item in batch]
@flow
def batch_flow(data):
batch_size = 1000
batches = [data[i:i + batch_size] for i in range(0, len(data), batch_size)]
for batch in batches:
process_batch.submit(batch)
b. Memory-Efficient Processing
python
from prefect import flow, task
import gc
@task
def memory_efficient_task():
# Process in chunks
for chunk in get_data_chunks():
process_chunk(chunk)
# Force garbage collection after each chunk
gc.collect()
4. Monitoring and Prevention
python
from prefect import flow
import psutil
import logging
@flow(log_prints=True)
def monitored_flow():
print(f"Memory usage at start: {psutil.virtual_memory().percent}%")
try:
# Your flow logic here
pass
finally:
print(f"Memory usage at end: {psutil.virtual_memory().percent}%")
5. Best Practices to Implement
- Use generator expressions instead of lists for large datasets
- Implement proper cleanup in your tasks
- Add memory logging at critical points
- Use Prefect's built-in retries for transient issues
Here's a complete example putting it all together:
python
from prefect import flow, task
import psutil
import gc
from typing import Generator
import logging
@task(retries=3, retry_delay_seconds=60)
def memory_safe_task(data: Generator):
memory_threshold = 80 # 80% memory usage threshold
try:
for chunk in data:
# Check memory usage
if psutil.virtual_memory().percent > memory_threshold:
gc.collect() # Force garbage collection
if psutil.virtual_memory().percent > memory_threshold:
raise RuntimeError(f"Memory usage too high: {psutil.virtual_memory().percent}%")
# Process chunk
process_chunk(chunk)
except Exception as e:
logging.error(f"Task failed: {str(e)}")
raise
finally:
gc.collect() # Final cleanup
@flow(
task_runner_buffer_size=5, # Limit concurrent tasks
retries=2,
log_prints=True
)
def memory_aware_flow():
print(f"Starting flow with memory usage: {psutil.virtual_memory().percent}%")
# Generate data in chunks instead of loading all at once
data_generator = (chunk for chunk in get_data_chunks())
# Process with memory-safe task
memory_safe_task(data_generator)
If you're still seeing SIGKILL errors after implementing these solutions, it would be helpful to know:
1. What is the size of your data?
2. How many concurrent tasks are you running?
3. What are your current infrastructure resources (memory, CPU)?
This information would help me provide more specific optimizations for your use case.ASHIK
08/06/2025, 11:53 PMMarvin
08/06/2025, 11:53 PMMarvin
08/06/2025, 11:54 PMyaml
- name: "data-pipeline-worker-oklahoma"
replicas: 1
resources:
limits:
cpu: 6000m
memory: 8Gi
requests:
cpu: 6000m
memory: 8Gi
workerConfig:
workerName: "data-pipeline-worker-oklahoma"
workerProcesses: 3
defaultQueueConcurrency: 1
# Add health check settings
healthCheck:
enabled: true
period: 30
env:
# Memory optimization environment variables
- name: PYTHONUNBUFFERED
value: "1"
- name: PREFECT_WORKER_PREFETCH_SECONDS
value: "10"
# Add memory monitoring
- name: PREFECT_LOGGING_LEVEL
value: "INFO"
# Garbage collection settings
- name: PYTHONGC
value: "1"
4. Additional Recommendations:
python
# In your flow code, add memory monitoring:
from prefect import flow, task
import psutil
import gc
@task
def monitor_memory():
memory_usage = psutil.Process().memory_info().rss / 1024 / 1024 # MB
print(f"Current memory usage: {memory_usage:.2f} MB")
if memory_usage > 2000: # 2GB warning threshold
gc.collect()
print("Forced garbage collection")
@flow(retries=2)
def your_flow():
monitor_memory()
# Your existing flow logic
result = process_data()
monitor_memory()
return result
5. Monitoring Best Practices:
- Set up monitoring for OOM events using your cluster's monitoring tools
- Watch for memory trends over time
- Consider implementing horizontal pod autoscaling based on memory usage
If you're still seeing SIGKILL errors after implementing these changes, we should look at:
1. The actual memory usage patterns of your flows
2. The size and nature of the data being processed
3. The specific tasks that are running when the SIGKILL occurs
Would you like me to provide more specific guidance about any of these areas?ASHIK
08/06/2025, 11:55 PMMarvin
08/06/2025, 11:55 PMMarvin
08/06/2025, 11:58 PMPYTHONUNBUFFERED=1
- This is a Python-specific environment variable (not Prefect-specific)
- Forces Python to run stdout and stderr unbuffered
- Useful for seeing logs immediately but doesn't actually help with memory management
- Recommendation: Not necessary for memory management, though can be helpful for debugging
2. PREFECT_WORKER_PREFETCH_SECONDS=10
- This was incorrect advice from me
- This is not a documented Prefect environment variable
- Recommendation: Remove this, it's not valid
3. PREFECT_LOGGING_LEVEL=INFO
- This is a valid Prefect environment variable
- Controls the logging verbosity
- Recommendation: Keep this if you want detailed logs
4. PYTHONGC=1
- This was incorrect advice from me
- This is not a valid Python environment variable
- For garbage collection control, you should use code-level approaches instead
Here's the corrected configuration:
yaml
- name: "data-pipeline-worker-oklahoma"
replicas: 1
resources:
limits:
cpu: 6000m
memory: 8Gi
requests:
cpu: 6000m
memory: 8Gi
workerConfig:
workerName: "data-pipeline-worker-oklahoma"
workerProcesses: 3
defaultQueueConcurrency: 1
env:
- name: PREFECT_LOGGING_LEVEL
value: "INFO"
For actual memory management, you should instead:
1. Use Code-Level Memory Management:
python
from prefect import flow, task
import gc
import psutil
@task
def memory_intensive_task():
try:
# Your task logic here
result = process_data()
# Explicit garbage collection after intensive operations
gc.collect()
return result
finally:
# Monitor memory usage
memory_usage = psutil.Process().memory_info().rss / 1024 / 1024 # MB
print(f"Task memory usage: {memory_usage:.2f} MB")
2. Use Prefect's Built-in Task Settings:
python
@task(
retries=2,
retry_delay_seconds=30,
timeout_seconds=3600 # Set appropriate timeout
)
def your_task():
# Task logic here
pass
3. Configure Worker Resources Properly:
yaml
resources:
limits:
memory: 8Gi
requests:
memory: 8Gi
The key is to focus on:
- Proper resource allocation in Kubernetes
- Code-level memory management
- Task-level configuration in Prefect
- Monitoring and logging
Rather than relying on environment variables that don't actually help with memory management, I recommend:
1. Monitoring memory usage in your flows
2. Using Prefect's built-in task configuration
3. Setting appropriate Kubernetes resource limits
4. Implementing proper error handling and retries
Would you like me to provide more specific examples of any of these approaches?