Skip to content

Instantly share code, notes, and snippets.

@jmealo
Last active March 18, 2025 18:01
Show Gist options
  • Save jmealo/3acf4c25791c252b4706cd2757f703ac to your computer and use it in GitHub Desktop.
Save jmealo/3acf4c25791c252b4706cd2757f703ac to your computer and use it in GitHub Desktop.
CNPG Volume Snapshot Cleanup Kubernetes Cron Job
import json
import os
import re
from datetime import datetime, timezone, timedelta
import logging
import argparse
from kubernetes import client, config
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
def get_k8s_client():
"""
Get Kubernetes client configured for either in-cluster or local use.
Tries in-cluster first, falls back to local kubeconfig.
"""
try:
# Try in-cluster configuration first
config.load_incluster_config()
logger.info("Using in-cluster Kubernetes configuration")
return client.CustomObjectsApi()
except Exception as e:
logger.info(f"In-cluster config failed: {e}")
try:
# Fall back to local kubeconfig
config.load_kube_config()
logger.info("Using local kubeconfig for Kubernetes configuration")
return client.CustomObjectsApi()
except Exception as e:
logger.error(f"Failed to initialize Kubernetes client: {e}")
raise
def get_backups(namespace):
"""Get all backups in the specified namespace using K8s API."""
try:
api = get_k8s_client()
backups = api.list_namespaced_custom_object(
group="postgresql.cnpg.io",
version="v1",
namespace=namespace,
plural="backups",
)
return backups
except Exception as e:
logger.error(f"Failed to get backups: {e}")
raise
def delete_backup(name, namespace):
"""Delete a specific backup using K8s API."""
try:
api = get_k8s_client()
api.delete_namespaced_custom_object(
group="postgresql.cnpg.io",
version="v1",
namespace=namespace,
plural="backups",
name=name,
)
logger.info(f"Successfully deleted backup {name}")
except Exception as e:
logger.error(f"Failed to delete backup {name}: {e}")
def is_scheduled_backup(backup):
"""
Determine if a backup was created by a schedule or manually.
"""
# Check for the scheduled backup label which is the definitive marker for scheduled backups
labels = backup.get("metadata", {}).get("labels", {})
if "cnpg.io/scheduled-backup" in labels:
return True
# Check annotations as a secondary indicator
annotations = backup.get("metadata", {}).get("annotations", {})
if "cnpg.io/schedule" in annotations or "cnpg.io/scheduledBackup" in annotations:
return True
# As a last resort, check the name pattern
# But only if none of the above checks passed
name = backup.get("metadata", {}).get("name", "")
if "-hourly-" in name or "-daily-" in name or "-weekly-" in name:
# If the name contains timestamps in the format like "20250318150000"
# This is likely a scheduled backup
if re.search(r"\d{14}$", name):
return True
return False
def should_delete_backup(
backup, retention_days, max_failed_age_hours, include_manual=False
):
"""
Determine if a backup should be deleted based on its status, age, and type.
"""
# Check if this is a volume snapshot backup - ignore other types
method = backup.get("spec", {}).get("method", "")
if method != "volumeSnapshot":
return False
# Check if it's a scheduled backup or if we should include manual backups
scheduled = is_scheduled_backup(backup)
if not scheduled and not include_manual:
return False
creation_time = datetime.strptime(
backup["metadata"]["creationTimestamp"], "%Y-%m-%dT%H:%M:%SZ"
).replace(tzinfo=timezone.utc)
current_time = datetime.now(timezone.utc)
backup_age = current_time - creation_time
# Get status information
status = backup.get("status", {})
phase = status.get("phase", "unknown")
# Check for "SnapshotsLimitReached" error - prioritize these for deletion
error_msg = status.get("error", "")
if "SnapshotsLimitReached" in error_msg:
# If we're hitting snapshot limits, be more aggressive with cleanup
# Delete these after just 1 hour regardless of other conditions
return backup_age > timedelta(hours=1)
# Handle failed backups
if phase == "failed":
return backup_age > timedelta(hours=max_failed_age_hours)
# Handle stuck pending backups (in 'started' phase)
if phase == "started":
# If a backup has been in 'started' state for more than 2 hours, consider it stuck
return backup_age > timedelta(hours=2)
# Handle completed backups
if phase == "completed":
return backup_age > timedelta(days=retention_days)
# For backups without a phase (pending), keep them unless very old
return backup_age > timedelta(days=1)
def cleanup_backups(
namespace, retention_days, max_failed_age_hours, include_manual=False, dry_run=False
):
"""
Clean up old and failed backups based on retention policy.
"""
try:
backups = get_backups(namespace)
# Count the snapshots and sort by age (oldest first)
# This way we can delete oldest snapshots first if we need to make space
sorted_backups = sorted(
backups["items"], key=lambda b: b["metadata"]["creationTimestamp"]
)
# Filter to only include volume snapshots for our counts
volume_snapshots = [
b
for b in sorted_backups
if b.get("spec", {}).get("method") == "volumeSnapshot"
]
total_backups = len(sorted_backups)
volume_snapshot_count = len(volume_snapshots)
failed_snapshots = sum(
1 for b in volume_snapshots if b.get("status", {}).get("phase") == "failed"
)
completed_snapshots = sum(
1
for b in volume_snapshots
if b.get("status", {}).get("phase") == "completed"
)
logger.info(
f"Found {total_backups} total backups, {volume_snapshot_count} volume snapshots"
)
logger.info(
f"Volume snapshot status: {completed_snapshots} completed, {failed_snapshots} failed"
)
# Check for "SnapshotsLimitReached" errors
limit_reached_errors = sum(
1
for b in volume_snapshots
if "SnapshotsLimitReached" in b.get("status", {}).get("error", "")
)
if limit_reached_errors > 0:
logger.warning(
f"Found {limit_reached_errors} backups with SnapshotsLimitReached errors"
)
# Counters for summary reporting
to_delete_count = 0
to_keep_count = 0
delete_by_category = {
"failed": 0,
"limit_reached": 0,
"completed_old": 0,
"started_stuck": 0,
"other": 0,
}
# Store backups to delete for dry run summary
backups_to_delete = []
for backup in sorted_backups:
name = backup["metadata"]["name"]
scheduled = is_scheduled_backup(backup)
backup_type = "scheduled" if scheduled else "manual"
phase = backup.get("status", {}).get("phase", "unknown")
method = backup.get("spec", {}).get("method", "unknown")
error = backup.get("status", {}).get("error", "")
# Skip non-volumeSnapshot backups in the logging to reduce noise
if method != "volumeSnapshot":
continue
# Age calculation for logging
creation_time = datetime.strptime(
backup["metadata"]["creationTimestamp"], "%Y-%m-%dT%H:%M:%SZ"
).replace(tzinfo=timezone.utc)
current_time = datetime.now(timezone.utc)
age = current_time - creation_time
age_str = f"{age.days}d {age.seconds // 3600}h"
should_delete = should_delete_backup(
backup, retention_days, max_failed_age_hours, include_manual
)
if should_delete:
to_delete_count += 1
# Categorize deletion reason for summary
if phase == "failed":
if "SnapshotsLimitReached" in error:
delete_by_category["limit_reached"] += 1
else:
delete_by_category["failed"] += 1
elif phase == "completed":
delete_by_category["completed_old"] += 1
elif phase == "started":
delete_by_category["started_stuck"] += 1
else:
delete_by_category["other"] += 1
# Add to list for dry run summary
backups_to_delete.append(
{
"name": name,
"age": age_str,
"type": backup_type,
"method": method,
"phase": phase,
}
)
if dry_run:
logger.debug(
f"Would delete {backup_type} backup {name} (type: {method}, phase: {phase}, age: {age_str})"
)
else:
logger.info(
f"Deleting {backup_type} backup {name} (type: {method}, phase: {phase}, age: {age_str})"
)
delete_backup(name, namespace)
else:
to_keep_count += 1
logger.debug(
f"Keeping {backup_type} backup {name} (type: {method}, phase: {phase}, age: {age_str})"
)
# Print summary
if dry_run:
logger.info(
f"DRY RUN SUMMARY: Would delete {to_delete_count} backups and keep {to_keep_count} backups"
)
if to_delete_count > 0:
logger.info(f" - Would delete by category: {delete_by_category}")
logger.info(f" - First 10 backups that would be deleted:")
for i, backup in enumerate(backups_to_delete[:10]):
logger.info(
f" {i + 1}. {backup['name']} (type: {backup['method']}, phase: {backup['phase']}, age: {backup['age']})"
)
if len(backups_to_delete) > 10:
logger.info(
f" ... and {len(backups_to_delete) - 10} more backups"
)
else:
logger.info(
f"Deleted {to_delete_count} backups and kept {to_keep_count} backups"
)
if to_delete_count > 0:
logger.info(f" - Deleted by category: {delete_by_category}")
except Exception as e:
logger.error(f"Error during backup cleanup: {e}")
raise
def main():
# Parse command line arguments
parser = argparse.ArgumentParser(
description="Clean up old and failed PostgreSQL backups"
)
parser.add_argument(
"--namespace",
default=os.environ.get("NAMESPACE", "db"),
help="Kubernetes namespace where backups are located (default: db)",
)
parser.add_argument(
"--retention-days",
type=int,
default=int(os.environ.get("RETENTION_DAYS", "7")),
help="Number of days to retain completed backups (default: 7)",
)
parser.add_argument(
"--max-failed-age-hours",
type=int,
default=int(os.environ.get("MAX_FAILED_AGE_HOURS", "24")),
help="Number of hours to retain failed backups (default: 24)",
)
parser.add_argument(
"--include-manual",
action="store_true",
default=os.environ.get("INCLUDE_MANUAL", "false").lower() == "true",
help="Include manual backups in cleanup (default: false)",
)
parser.add_argument(
"--dry-run",
action="store_true",
default=os.environ.get("DRY_RUN", "false").lower() == "true",
help="Show what would be deleted without actually deleting (default: false)",
)
parser.add_argument("--debug", action="store_true", help="Enable debug logging")
args = parser.parse_args()
# Set log level based on arguments
if args.debug:
logger.setLevel(logging.DEBUG)
logger.info(f"Starting backup cleanup in namespace {args.namespace}")
logger.info(f"Retention period: {args.retention_days} days")
logger.info(f"Max failed backup age: {args.max_failed_age_hours} hours")
logger.info(f"Include manual backups: {args.include_manual}")
logger.info(f"Dry run: {args.dry_run}")
cleanup_backups(
args.namespace,
args.retention_days,
args.max_failed_age_hours,
args.include_manual,
args.dry_run,
)
if __name__ == "__main__":
main()
FROM python:3.9-slim
# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PIP_NO_CACHE_DIR=1
# Install the Kubernetes Python client
RUN pip install --no-cache-dir kubernetes==32.0.1
# Create a non-root user to run the application
RUN groupadd -g 1001 appuser && \
useradd -r -u 1001 -g appuser appuser && \
mkdir -p /home/appuser && \
chown -R appuser:appuser /home/appuser
# Set working directory
WORKDIR /app
# Copy the backup cleanup script
COPY backup_cleanup.py /app/
# Set ownership of the application files
RUN chown -R appuser:appuser /app
# Switch to non-root user
USER 1001
# Command to run the script
ENTRYPOINT ["python", "/app/backup_cleanup.py", "--debug"]
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: cnpg-volume-snapshot-cleanup
rules:
- apiGroups: ["postgresql.cnpg.io"]
resources: ["backups"]
verbs: ["get", "list", "delete"]
- apiGroups: [""]
resources: ["secrets"]
verbs: ["get"]
resourceNames: ["docker-registry-credentials"] # make sure your ClusterRole has access to your docker registry
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: cnpg-volume-snapshot-cleanup
namespace: db
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: cnpg-volume-snapshot-cleanup-binding
subjects:
- kind: ServiceAccount
name: cnpg-volume-snapshot-cleanup
namespace: db
roleRef:
kind: ClusterRole
name: cnpg-volume-snapshot-cleanup
apiGroup: rbac.authorization.k8s.io
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cnpg-backup-cleanup
namespace: db
spec:
schedule: "0 2 * * *"
concurrencyPolicy: Forbid
jobTemplate:
spec:
ttlSecondsAfterFinished: 3600 # cleanup job after an hour (to keep k8s/grafana clean)
template:
metadata:
labels:
app: cnpg-volume-snapshot-cleanup
component: backup-cleanup
spec:
imagePullSecrets:
- name: docker-registry-credentials
serviceAccountName: cnpg-volume-snapshot-cleanup
securityContext:
runAsUser: 1001
runAsGroup: 1001
fsGroup: 1001
containers:
- name: backup-cleanup
image: <$YOUR_PRIVATE_REGISTRY>/docker/cnpg-backup-cleanup:latest
imagePullPolicy: Always
env:
- name: NAMESPACE
value: "db" # Namespace to clean up
- name: RETENTION_DAYS
value: "7"
- name: MAX_FAILED_AGE_HOURS
value: "24"
- name: INCLUDE_MANUAL
value: "false" # Set to "true" to include manual backups
resources:
limits:
memory: "256Mi"
cpu: "500m"
requests:
memory: "128Mi"
cpu: "250m"
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
restartPolicy: OnFailure
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment