Skip to content

Instantly share code, notes, and snippets.

@arvidkahl
Last active March 1, 2025 22:04
Show Gist options
  • Save arvidkahl/9008a202d59dacbf91a650d7741a84dc to your computer and use it in GitHub Desktop.
Save arvidkahl/9008a202d59dacbf91a650d7741a84dc to your computer and use it in GitHub Desktop.
redis_investigation.py
import redis
from collections import defaultdict
import json
from datetime import datetime, timedelta
import random
import time
def analyze_redis_memory(host='localhost', port=6379, db=1, pattern='*', sample_size=1000):
"""
Analyzes Redis keys and sorts them by memory usage.
Returns a list of key information sorted by size.
"""
# Connect to Redis
r = redis.Redis(host=host, port=port, db=db, decode_responses=True)
print(f"Connected to Redis db{db}, analyzing keys matching pattern: {pattern}")
# Count total keys (for visibility)
total_keys = r.dbsize()
print(f"Total keys in db{db}: {total_keys:,}")
# Get a sample of keys - we'll use scan for efficiency with large databases
keys = []
cursor = 0
scanned_count = 0
print(f"Sampling {sample_size} keys...")
while len(keys) < sample_size:
cursor, batch = r.scan(cursor=cursor, match=pattern, count=1000)
keys.extend(batch[:sample_size - len(keys)]) # Only take what we need
scanned_count += len(batch)
# Exit if we've scanned all keys or have enough samples
if cursor == 0 or len(keys) >= sample_size:
break
# Add a small delay to prevent overwhelming the server
time.sleep(0.01)
print(f"Sampled {len(keys)} keys after scanning {scanned_count} keys")
# Store key information
key_info = []
prefix_counts = defaultdict(int)
type_counts = defaultdict(int)
total_memory = 0
for i, key in enumerate(keys):
if i % 100 == 0:
print(f"Processing key {i}/{len(keys)}...")
try:
# Get memory usage for the key
memory_usage = r.memory_usage(key)
if memory_usage is None: # Skip if we can't get memory usage
continue
total_memory += memory_usage
# Count key prefixes (before first colon)
if ':' in key:
prefix = key.split(':', 1)[0]
else:
prefix = key
prefix_counts[prefix] += 1
key_type = r.type(key)
type_counts[key_type] += 1
ttl = r.ttl(key)
# Skip if key no longer exists
if ttl == -2: # -2 means key doesn't exist
continue
# Get additional info based on type
type_specific_info = {}
if key_type == 'hash':
type_specific_info['fields'] = r.hlen(key)
if type_specific_info['fields'] < 5 and len(key_info) < 50:
# For small hashes, get all fields to understand the data better
type_specific_info['sample_data'] = r.hgetall(key)
elif key_type == 'list':
type_specific_info['length'] = r.llen(key)
elif key_type == 'set':
type_specific_info['members'] = r.scard(key)
elif key_type == 'zset':
type_specific_info['members'] = r.zcard(key)
elif key_type == 'string':
if r.strlen(key) < 100: # Only for small strings
type_specific_info['value'] = r.get(key)
expiry_info = "no expiry"
if ttl != -1: # Has expiration
expiry_date = datetime.now() + timedelta(seconds=ttl)
expiry_info = expiry_date.strftime('%Y-%m-%d %H:%M:%S')
key_info.append({
'key': key,
'size_bytes': memory_usage,
'size_mb': round(memory_usage / 1024 / 1024, 2),
'type': key_type,
'type_info': type_specific_info,
'expiry': expiry_info
})
except redis.exceptions.ResponseError as e:
print(f"Error processing key {key}: {str(e)}")
continue
# Sort by size (largest first)
key_info.sort(key=lambda x: x['size_bytes'], reverse=True)
# Calculate average key size
avg_size = total_memory / len(key_info) if key_info else 0
# Estimate total memory based on sampling
estimated_total_memory = avg_size * total_keys
stats = {
'total_keys': total_keys,
'sampled_keys': len(key_info),
'total_memory_sampled': total_memory,
'avg_key_size': avg_size,
'estimated_total_memory': estimated_total_memory,
'prefix_counts': dict(sorted(prefix_counts.items(), key=lambda x: x[1], reverse=True)),
'type_counts': dict(type_counts)
}
return key_info, stats
def get_recent_keys(r, keys, limit=50):
"""
Returns the most recently accessed keys using OBJECT IDLETIME
"""
key_idle_times = []
for i, key in enumerate(keys[:250]): # Only check a limited number to avoid overloading
if i % 50 == 0:
print(f"Checking idle time for key {i}/250...")
try:
idle_time = r.object('idletime', key)
if idle_time is not None:
key_idle_times.append({
'key': key,
'idle_seconds': idle_time,
'last_accessed': datetime.now() - timedelta(seconds=idle_time)
})
except redis.exceptions.ResponseError:
continue
# Sort by idle time (lowest first = most recently accessed)
return sorted(key_idle_times, key=lambda x: x['idle_seconds'])[:limit]
def analyze_key_patterns(r, pattern='*', sample_size=1000):
"""
Analyzes key patterns to find commonalities.
"""
cursor = 0
keys = []
while len(keys) < sample_size:
cursor, batch = r.scan(cursor=cursor, match=pattern, count=1000)
keys.extend(batch[:sample_size - len(keys)])
if cursor == 0 or len(keys) >= sample_size:
break
time.sleep(0.01)
# Analyze key structure
pattern_counts = defaultdict(int)
for key in keys:
parts = key.split(':')
if len(parts) > 1:
# Get the first two parts of the key pattern
if len(parts) >= 2:
pattern = f"{parts[0]}:{parts[1]}:*"
else:
pattern = f"{parts[0]}:*"
pattern_counts[pattern] += 1
else:
pattern_counts[key] += 1
# Sort patterns by frequency
sorted_patterns = sorted(pattern_counts.items(), key=lambda x: x[1], reverse=True)
return sorted_patterns
def print_results(results, stats, recent_keys, limit=50):
"""
Pretty prints the results and statistics
"""
print("\n===== REDIS MEMORY ANALYSIS =====\n")
# Print overall statistics
print("OVERALL STATISTICS:")
print(f"Total keys in database: {stats['total_keys']:,}")
print(f"Sampled keys: {stats['sampled_keys']:,}")
print(f"Average key size: {stats['avg_key_size']:.2f} bytes")
print(f"Estimated total memory: {stats['estimated_total_memory']:,.2f} bytes ({stats['estimated_total_memory']/1024/1024/1024:.2f} GB)")
# Print key prefix distribution
print("\nKEY PREFIX DISTRIBUTION:")
for prefix, count in list(stats['prefix_counts'].items())[:10]:
print(f"{prefix}: {count} keys ({count/stats['sampled_keys']*100:.1f}%)")
# Print key type distribution
print("\nKEY TYPE DISTRIBUTION:")
for key_type, count in stats['type_counts'].items():
print(f"{key_type}: {count} keys ({count/stats['sampled_keys']*100:.1f}%)")
# Print top keys by memory usage
print(f"\nTOP {limit} KEYS BY MEMORY USAGE:")
print(f"{'Key':<70} {'Size':<10} {'Type':<10} {'Expires':<20} {'Additional Info'}")
print("-" * 150)
for info in results[:limit]:
# Truncate key for display
displayed_key = info['key']
if len(displayed_key) > 67:
displayed_key = displayed_key[:67] + "..."
# Format type-specific info
if 'type_info' in info and info['type_info']:
if 'sample_data' in info['type_info']:
# For sample data, show a summary instead of the full data
sample = info['type_info'].copy()
sample_data = sample.pop('sample_data', {})
if len(sample_data) > 0:
sample['data_preview'] = f"{list(sample_data.items())[0]}" + (", ..." if len(sample_data) > 1 else "")
type_info = json.dumps(sample)
else:
type_info = json.dumps(info['type_info'])
else:
type_info = ''
print(f"{displayed_key:<70} "
f"{str(info['size_mb']) + 'MB':<10} "
f"{info['type']:<10} "
f"{info['expiry']:<20} "
f"{type_info}")
# Print recently accessed keys
print("\nMOST RECENTLY ACCESSED KEYS:")
print(f"{'Key':<70} {'Last Accessed':<25} {'Idle Time'}")
print("-" * 110)
for info in recent_keys:
idle_time = str(timedelta(seconds=info['idle_seconds']))
last_accessed = info['last_accessed'].strftime('%Y-%m-%d %H:%M:%S')
displayed_key = info['key']
if len(displayed_key) > 67:
displayed_key = displayed_key[:67] + "..."
print(f"{displayed_key:<70} "
f"{last_accessed:<25} "
f"{idle_time}")
def print_key_patterns(patterns, limit=20):
"""
Prints the most common key patterns
"""
print("\nMOST COMMON KEY PATTERNS:")
print(f"{'Pattern':<70} {'Count':<10} {'Percentage'}")
print("-" * 100)
total = sum(count for _, count in patterns)
for pattern, count in patterns[:limit]:
percentage = (count / total) * 100
print(f"{pattern:<70} {count:<10} {percentage:.2f}%")
if __name__ == "__main__":
# Connect to Redis db1 where most keys are
r = redis.Redis(host='localhost', port=6379, db=1, decode_responses=True)
print("Starting Redis memory analysis...")
# Analyze key patterns first to understand the structure
print("\nAnalyzing key patterns...")
patterns = analyze_key_patterns(r)
print_key_patterns(patterns)
# Get and print memory analysis
print("\nAnalyzing memory usage...")
results, stats = analyze_redis_memory(db=1)
# Get a subset of keys for the recent access check
subset_keys = [info['key'] for info in results[:250]]
recent_keys = get_recent_keys(r, subset_keys)
print_results(results, stats, recent_keys)
print("\nAnalysis complete!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment