Viktor
08/04/2025, 6:41 PMMarvin
08/04/2025, 6:41 PMMarvin
08/04/2025, 6:41 PMpersist_result=False
and cache_result_in_memory=False
on tasks when possible
- Be cautious with large data objects being passed between tasks, as they can remain in memory
- Consider using external storage for large intermediate results
2. Database Connection Management:
- Scale down the number of active database connections when possible
- Implement connection pooling with appropriate timeout settings
- Close connections explicitly when they're no longer needed
3. Best Practices for Large Data Handling:
- Use streaming approaches when possible instead of loading entire datasets into memory
- Implement garbage collection calls strategically in your flows
- Consider breaking large flows into smaller sub-flows that can release memory between executions
4. Code Example for Memory-Conscious Flow:
python
from prefect import flow, task
import gc
@task(persist_result=False, cache_result_in_memory=False)
def process_data(data):
# Process your data
result = data.compute() # If using dask or similar
return result
@flow
def memory_conscious_flow():
# Process in chunks or streams when possible
for chunk in data_chunks:
result = process_data(chunk)
# Do something with result
# Explicitly clean up
del result
gc.collect()
5. Infrastructure Considerations:
- If using Kubernetes, set appropriate memory limits and requests
- Monitor memory usage with appropriate observability tools
- Consider using worker pools with memory-based scaling policies
6. For Large DataFrame Operations:
- Consider using Dask DataFrames instead of Pandas for large datasets
- Use del
statements strategically after large operations
- Implement checkpointing for very large data processing pipelines
Important Links:
- Memory Management Blog Post
- Infrastructure Cleanup Jobs
- Related GitHub Issue Discussion
Note: While these strategies can help manage memory usage, there's currently no built-in automatic memory cleanup mechanism in Prefect. The best approach is to be proactive about memory management in your flow design and implementation.
Would you like me to elaborate on any of these strategies or provide more specific examples for your use case?Viktor
08/04/2025, 8:09 PMfor min_uc_id, max_uc_id in groups:
result = group_matched_components.submit(
enricehd_parquet_files,
min_uc_id=min_uc_id,
max_uc_id=max_uc_id
).result()
del result
gc.collect()
# To ease memory pressure.
time.sleep(5)
Marvin
08/04/2025, 8:09 PMMarvin
08/04/2025, 8:11 PMsubmit().result()
directly, which might keep references to the data in memory, try this pattern:
python
from prefect.utilities.asyncutils import sync_compatible
import gc
import psutil
@sync_compatible
async def process_groups(groups):
for min_uc_id, max_uc_id in groups:
# Submit the task
future = group_matched_components.submit(
enricehd_parquet_files,
min_uc_id=min_uc_id,
max_uc_id=max_uc_id
)
# Get the result and immediately clear it
result = await future
future = None # Clear the future reference
# Clear any references to the result
del result
gc.collect()
# Optional: Force Python to return memory to the OS
import ctypes
libc = ctypes.CDLL("libc.so.6")
libc.malloc_trim(0)
# Log memory usage for debugging
process = psutil.Process()
print(f"Memory usage: {process.memory_info().rss / 1024 / 1024 / 1024:.2f} GB")
await asyncio.sleep(5) # Async sleep instead of time.sleep
2. Task Configuration
Modify your task definition to minimize memory retention:
python
@task(
persist_result=False,
cache_result_in_memory=False,
retries=0 # Disable retries if you don't need them
)
def enricehd_parquet_files(min_uc_id, max_uc_id):
# Your processing logic here
pass
3. Data Processing Strategies
If you're processing large parquet files, consider these modifications:
python
@task(persist_result=False, cache_result_in_memory=False)
def enricehd_parquet_files(min_uc_id, max_uc_id):
try:
# Read and process in chunks if possible
with pq.ParquetFile(path) as pf:
for batch in pf.iter_batches():
# Process each batch
result = process_batch(batch)
# Write results immediately to storage
save_results(result)
del result
gc.collect()
finally:
# Ensure cleanup happens
gc.collect()
4. Additional Memory Management Techniques
a. Use a context manager for memory-intensive operations:
python
from contextlib import contextmanager
@contextmanager
def memory_manager():
try:
yield
finally:
gc.collect()
import ctypes
libc = ctypes.CDLL("libc.so.6")
libc.malloc_trim(0)
# Usage in your code
with memory_manager():
result = group_matched_components.submit(...).result()
b. Consider implementing a memory monitor:
python
def check_memory_usage():
process = psutil.Process()
memory_gb = process.memory_info().rss / 1024 / 1024 / 1024
if memory_gb > 45: # Alert if memory usage is too high
logger.warning(f"High memory usage detected: {memory_gb:.2f} GB")
5. System-Level Considerations
- If possible, set a memory limit for your Python process using resource.setrlimit
- Consider using a memory profiler to identify specific memory leaks
- Monitor memory usage patterns to identify if there are specific operations causing the retention
6. Alternative Approach
If the above solutions don't help, consider restructuring your code to use streaming or chunking:
python
@task
def process_group_chunked(min_uc_id, max_uc_id, chunk_size=1000):
for chunk_start in range(min_uc_id, max_uc_id, chunk_size):
chunk_end = min(chunk_start + chunk_size, max_uc_id)
# Process smaller chunk
process_chunk(chunk_start, chunk_end)
gc.collect()
The persistent high memory usage despite garbage collection suggests that either:
1. There are still references to your data somewhere in the Prefect runtimeMarvin
08/04/2025, 8:11 PMenricehd_parquet_files
is processing?
- What is the typical size of the data being processed in each iteration?
- Are you using any particular executor (Dask, Ray, etc.)?
This information would help me provide more specific recommendations for your use case.Viktor
08/04/2025, 8:14 PM@task(
persist_result=False,
cache_result_in_memory=False,
)
def calculate_adaptive_bins(parquet_paths: List[str], target_records_per_bin: int = 1_500_000) -> List[Tuple[int, int]]:
"""
Simple adaptive binning - just calculates the right number of bins based on total records.
Much simpler than the complex version!
"""
parquet_handler = ParquetHandler()
ddf = parquet_handler.read_parquet_dask(parquet_paths)
logger = get_run_logger()
<http://logger.info|logger.info>("Converting uc_id to long for memory optimization...")
ddf['uc_id'] = dd.to_numeric(ddf['uc_id'], errors='coerce').astype('int64')
with RayDaskCallback():
# Get total records and calculate optimal number of bins
total_records = len(ddf)
optimal_bins = max(1, round(total_records / target_records_per_bin))
<http://logger.info|logger.info>(f"Total records: {total_records:,}")
<http://logger.info|logger.info>(f"Target per bin: {target_records_per_bin:,}")
<http://logger.info|logger.info>(f"Calculated bins: {optimal_bins}")
<http://logger.info|logger.info>(f"Expected records per bin: {total_records / optimal_bins:,.0f}")
# Use the same quantile approach as your original, just with calculated bin count
quantiles = ddf['uc_id'].quantile([i / optimal_bins for i in range(optimal_bins + 1)]).compute().values
bins = []
for i in range(optimal_bins):
start = int(quantiles[i])
if i == optimal_bins - 1:
# Last bin: include the maximum value
end = int(quantiles[i + 1]) + 1
else:
# Other bins: exclusive end to avoid overlap
end = int(quantiles[i + 1])
bins.append((start, end))
<http://logger.info|logger.info>(f"Calculated uc_id bins: {bins}")
del ddf
gc.collect()
return bins
Marvin
08/04/2025, 8:14 PMMarvin
08/04/2025, 8:14 PMpython
from prefect import task, get_run_logger
import dask.dataframe as dd
import gc
import ray
from ray.util.dask import RayDaskCallback
import numpy as np
@task(
persist_result=False,
cache_result_in_memory=False,
retries=0 # Disable retries unless needed
)
def calculate_adaptive_bins(
parquet_paths: List[str],
target_records_per_bin: int = 1_500_000
) -> List[Tuple[int, int]]:
"""
Calculate adaptive bins with improved memory management.
"""
logger = get_run_logger()
try:
# Configure Dask for better memory handling
with dask.config.set({
'distributed.worker.memory.target': 0.6, # Target 60% memory usage
'distributed.worker.memory.spill': 0.7, # Spill to disk at 70%
'distributed.worker.memory.pause': 0.8, # Pause processing at 80%
'distributed.worker.memory.terminate': 0.95 # Terminate at 95%
}):
# Read only necessary columns
ddf = dd.read_parquet(
parquet_paths,
columns=['uc_id'],
engine='pyarrow'
)
<http://logger.info|logger.info>("Converting uc_id to long for memory optimization...")
ddf['uc_id'] = dd.to_numeric(ddf['uc_id'], errors='coerce').astype('int64')
with RayDaskCallback():
# Calculate total records in chunks to avoid memory spike
total_records = ddf.shape[0].compute()
optimal_bins = max(1, round(total_records / target_records_per_bin))
<http://logger.info|logger.info>(f"Total records: {total_records:,}")
<http://logger.info|logger.info>(f"Calculated bins: {optimal_bins}")
# Calculate quantiles with better memory handling
quantile_points = np.linspace(0, 1, optimal_bins + 1)
quantiles = ddf['uc_id'].quantile(quantile_points).compute()
# Clear ddf as soon as possible
del ddf
gc.collect()
# Generate bins
bins = [
(
int(quantiles[i]),
int(quantiles[i + 1]) + 1 if i == optimal_bins - 1 else int(quantiles[i + 1])
)
for i in range(optimal_bins)
]
<http://logger.info|logger.info>(f"Calculated {len(bins)} uc_id bins")
# Clear quantiles
del quantiles
gc.collect()
return bins
finally:
# Cleanup
ray.util.dask.shutdown_ray_dask()
gc.collect()
2. Add a Ray memory monitor (create a new file memory_utils.py
):
```python
import psutil
import ray
import logging
from typing import Optional
class RayMemoryMonitor:
def __init__(self, threshold_gb: float = 45.0, logger: Optional[logging.Logger] = None):
self.threshold_gb = threshold_gb
self.logger = logger or logging.getLogger(name)
def check_memory(self):
process = psutil.Process()
memory_gb = process.memory_info().rss / 1024 / 1024 / 1024
if memory_gb > self.threshold_gb:
self.logger.warning(f"High memory usage detected: {memory_gb:.2f} GB")
# Get Ray memory usage
ray_memory = ray.available_resources().get("memory", 0)
self.logger.warning(f"Ray available memory: {ray_memory / 1024 / 1024 / 1024:.2f} GB")
# Suggest garbage collection
gc.collect()Marvin
08/04/2025, 8:14 PM3. *Configure Ray properly* before running your flow:
```python
@flow
def your_flow():
# Configure Ray with memory limits
ray.init(
_memory=80 * 1024 * 1024 * 1024, # 80GB max memory
object_store_memory=40 * 1024 * 1024 * 1024, # 40GB for object store
_plasma_directory="/tmp", # Use disk for spillover
runtime_env={
"env_vars": {
"MALLOC_TRIM_THRESHOLD_": "65536" # More aggressive memory trimming
}
}
)
try:
memory_monitor = RayMemoryMonitor(threshold_gb=45.0)
# Your existing code
bins = calculate_adaptive_bins(parquet_paths, target_records_per_bin=1_500_000)
# Monitor memory after bin calculation
memory_monitor.check_memory()
# Rest of your flow
for min_uc_id, max_uc_id in bins:
result = group_matched_components.submit(
enricehd_parquet_files,
min_uc_id=min_uc_id,
max_uc_id=max_uc_id
)
# Process result
finally:
# Cleanup Ray
ray.shutdown()
4. Additional Recommendations:
- Consider using Ray's object spilling feature to automatically move objects to disk when memory pressure is high:
python
ray.init(
object_spilling_config={
"type": "filesystem",
"params": {
"directory_path": "/path/to/spill"
}
}
)
- If possible, process your parquet files in chunks:
python
def count_records_in_chunks(parquet_paths):
total = 0
for path in parquet_paths:
pf = pq.ParquetFile(path)
total += pf.metadata.num_rows
return total
- Monitor Ray dashboard (typically at localhost:8265
) during execution to track memory usage
The main causes of your memory retention might be:
1. Ray's object store holding onto references
2. Dask dataframe operations not releasing memory immediately
3. Python's memory fragmentation when dealing with large datasets
Would you like me to:
1. Provide more specific Ray configuration options for your use case?
2. Show how to implement chunked processing of the parquet files?
3. Explain how to set up more detailed memory monitoring?
Let me know which aspect you'd like me to elaborate on further.Viktor
08/04/2025, 10:46 PMMarvin
08/04/2025, 10:46 PMMarvin
08/04/2025, 10:47 PMpython
from prefect.deployments import Deployment
from prefect.infrastructure import DockerContainer
# Define your Docker container with the environment variable
docker_container = DockerContainer(
image="your-image:tag",
env={
"MALLOC_TRIM_THRESHOLD_": "65536"
}
)
# Create your deployment
deployment = Deployment(
name="your-deployment-name",
flow=your_flow,
infrastructure=docker_container,
)
# Apply the deployment
deployment.apply()
Or if you're using a YAML configuration file (deployment.yaml
):
yaml
deployments:
- name: your-deployment-name
version: null
tags: []
description: null
schedule: null
flow_name: your_flow
entrypoint: path/to/your/flow.py:your_flow
parameters: {}
work_pool:
name: your-work-pool
work_queue_name: default
infrastructure:
type: docker-container
env:
MALLOC_TRIM_THRESHOLD_: "65536"
The environment variable will be available to your flow when it runs in the Docker container. This should help with memory management by making memory trimming more aggressive.
Would you like me to:
1. Show how to verify the environment variable is set correctly in your container?
2. Provide additional memory-related environment variables that might be helpful?
3. Explain how to combine this with other Docker container optimizations?Viktor
08/04/2025, 10:52 PM- name: "generate_creatordb-prod"
entrypoint: "generate_creatordb_flow.py:generate_creatordb_flow"
work_pool:
name: main-pool
job_variables:
image: "{{ build-image.image }}"
image_pull_policy: "Always"
env:
prefect_results_default_storage_block: "s3-bucket/workflows-s3-cache"
MALLOC_TRIM_THRESHOLD_: "0"
volumes:
- /prefect_data:/prefect_data
tags:
- prod
- "{{ $PREFECT_DEPLOYMENT_IMAGE_TAG }}"
description: "Production deployment for generate_creatordb (version: $PREFECT_DEPLOYMENT_IMAGE_TAG)"
parameters:
environment: "prod"
schedule:
cron: "0 2 * * 0"
timezone: "Europe/Paris"
Marvin
08/04/2025, 10:52 PMMarvin
08/04/2025, 10:53 PMMALLOC_TRIM_THRESHOLD_
is properly placed under the env
section of the job_variables
.
However, I notice you're setting it to "0" - this is different from the "65536" we discussed earlier. Let me explain the implications:
- `MALLOC_TRIM_THRESHOLD_=0`: Most aggressive setting - tells the allocator to try to release memory back to the OS immediately whenever possible. This can lead to:
- More frequent memory trimming operations
- Potentially better memory usage but with some CPU overhead
- Good for cases where memory pressure is high and CPU isn't a bottleneck
- `MALLOC_TRIM_THRESHOLD_=65536`: More balanced setting - only triggers memory trimming when the free memory exceeds 64KB. This:
- Reduces the frequency of trim operations
- Balances CPU usage with memory management
- Generally recommended for most use cases
Your configuration is valid and will work, but you might want to experiment with both values to see which works better for your specific workload. Here's the complete configuration with both options commented:
yaml
- name: "generate_creatordb-prod"
entrypoint: "generate_creatordb_flow.py:generate_creatordb_flow"
work_pool:
name: main-pool
job_variables:
image: "{{ build-image.image }}"
image_pull_policy: "Always"
env:
prefect_results_default_storage_block: "s3-bucket/workflows-s3-cache"
# Option 1: Most aggressive memory trimming (current setting)
MALLOC_TRIM_THRESHOLD_: "0"
# Option 2: Balanced memory trimming (uncomment to use)
# MALLOC_TRIM_THRESHOLD_: "65536"
volumes:
- /prefect_data:/prefect_data
tags:
- prod
- "{{ $PREFECT_DEPLOYMENT_IMAGE_TAG }}"
description: "Production deployment for generate_creatordb (version: $PREFECT_DEPLOYMENT_IMAGE_TAG)"
parameters:
environment: "prod"
schedule:
cron: "0 2 * * 0"
timezone: "Europe/Paris"
You might also want to consider adding these complementary memory-related environment variables:
yaml
env:
prefect_results_default_storage_block: "s3-bucket/workflows-s3-cache"
MALLOC_TRIM_THRESHOLD_: "0"
# Optional additional memory-related settings
PYTHONMALLOC: "malloc" # Use system malloc instead of Python's
MALLOC_ARENA_MAX: "2" # Limit number of memory pools
PYMALLOC_DEBUG: "1" # Enable debug mode for memory allocations (development only)
Would you like me to:
1. Explain how to monitor the effectiveness of these memory settings?
2. Show how to add memory metrics collection to your flow?
3. Provide guidance on tuning these values based on your workload?