Skip to content

Instantly share code, notes, and snippets.

@keif
Last active August 5, 2025 17:35
Show Gist options
  • Save keif/ece2fa9372d038f89a64ab14895ce462 to your computer and use it in GitHub Desktop.
Save keif/ece2fa9372d038f89a64ab14895ce462 to your computer and use it in GitHub Desktop.
This script fetches all GitHub notifications, checks if their associated issues or pull requests are closed/merged, and marks them as done by deleting them via the GitHub API.
#!/usr/bin/env python3
"""
GitHub Notifications Cleanup Script
Fetches all GitHub notifications, checks if their associated issues/PRs are closed/merged,
and marks them as done by deleting them via the GitHub API.
"""
import argparse
import logging
import os
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Dict, List, Optional, Tuple
import requests
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
class SubjectType(Enum):
"""Enum for GitHub notification subject types"""
PULL_REQUEST = "PullRequest"
ISSUE = "Issue"
RELEASE = "Release"
COMMIT = "Commit"
DISCUSSION = "Discussion"
OTHER = "Other"
@dataclass
class NotificationStats:
"""Statistics for the cleanup operation"""
total: int = 0
processed: int = 0
deleted: int = 0
errors: int = 0
skipped: int = 0
class GitHubNotificationCleaner:
"""Main class for managing GitHub notification cleanup"""
GITHUB_API_URL = "https://api.github.com"
DEFAULT_MAX_WORKERS = 5
DEFAULT_PER_PAGE = 100
RATE_LIMIT_THRESHOLD = 100 # Stop if fewer than this many requests remaining
def __init__(
self, token: str, dry_run: bool = False, max_workers: int = DEFAULT_MAX_WORKERS
):
"""
Initialize the GitHub notification cleaner.
Args:
token: GitHub API token
dry_run: If True, don't actually delete notifications
max_workers: Maximum number of parallel workers
"""
self.token = token
self.dry_run = dry_run
self.max_workers = max_workers
self.session = self._create_session()
self.stats = NotificationStats()
def _create_session(self) -> requests.Session:
"""Create a requests session with proper headers and retry configuration"""
session = requests.Session()
session.headers.update(
{
"Authorization": f"Bearer {self.token}",
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28",
}
)
# Add retry logic for transient failures
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
return session
def check_rate_limit(self) -> Tuple[int, int]:
"""
Check GitHub API rate limit status.
Returns:
Tuple of (remaining requests, reset timestamp)
"""
try:
response = self.session.get(f"{self.GITHUB_API_URL}/rate_limit")
response.raise_for_status()
data = response.json()
core = data["rate"]["core"]
return core["remaining"], core["reset"]
except Exception as e:
logger.warning(f"Failed to check rate limit: {e}")
return -1, -1
def wait_for_rate_limit_reset(self, reset_timestamp: int) -> None:
"""Wait until rate limit resets"""
reset_time = datetime.fromtimestamp(reset_timestamp)
wait_seconds = (reset_time - datetime.now()).total_seconds()
if wait_seconds > 0:
logger.info(
f"Rate limit exceeded. Waiting {wait_seconds:.0f} seconds until reset..."
)
time.sleep(wait_seconds + 1) # Add 1 second buffer
def get_all_notifications(
self, only_participating: bool = False, since: Optional[datetime] = None
) -> List[Dict]:
"""
Fetch all notifications with pagination and filtering options.
Args:
only_participating: Only fetch notifications where user is directly participating
since: Only fetch notifications updated after this time
Returns:
List of notification dictionaries
"""
notifications = []
params = {"all": "true", "per_page": self.DEFAULT_PER_PAGE}
if only_participating:
params["participating"] = "true"
if since:
params["since"] = since.isoformat() + "Z"
url = f"{self.GITHUB_API_URL}/notifications"
while url:
try:
# Check rate limit before making request
remaining, reset = self.check_rate_limit()
if 0 < remaining < self.RATE_LIMIT_THRESHOLD:
logger.warning(
f"Approaching rate limit ({remaining} requests remaining)"
)
if remaining < 10:
self.wait_for_rate_limit_reset(reset)
response = self.session.get(
url, params=params if not notifications else None
)
response.raise_for_status()
batch = response.json()
notifications.extend(batch)
logger.debug(f"Fetched {len(batch)} notifications from page")
# Get next page URL from Link header
url = self._get_next_page_url(response.headers.get("Link"))
params = None # Clear params for subsequent requests
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching notifications: {e}")
break
logger.info(f"Fetched {len(notifications)} total notifications")
return notifications
def _get_next_page_url(self, link_header: Optional[str]) -> Optional[str]:
"""Parse the Link header to get the next page URL"""
if not link_header:
return None
links = link_header.split(", ")
for link in links:
if 'rel="next"' in link:
return link[link.find("<") + 1 : link.find(">")]
return None
def is_closed_or_merged(self, notification: Dict) -> bool:
"""
Check if a notification's associated issue/PR is closed or merged.
Args:
notification: Notification dictionary
Returns:
True if the associated item is closed/merged
"""
subject = notification.get("subject", {})
subject_type = subject.get("type")
api_url = subject.get("url")
if not api_url:
logger.debug(f"No API URL for notification {notification.get('id')}")
return False
try:
response = self.session.get(api_url)
# Handle 404 - the issue/PR might have been deleted
if response.status_code == 404:
logger.info(
f"Issue/PR not found (404) for notification {notification.get('id')}"
)
return True # Consider deleted items as "closed"
response.raise_for_status()
data = response.json()
# Check based on subject type
if subject_type == SubjectType.PULL_REQUEST.value:
return (
data.get("merged_at") is not None or data.get("state") == "closed"
)
elif subject_type == SubjectType.ISSUE.value:
return data.get("state") == "closed"
elif subject_type == SubjectType.RELEASE.value:
return True # Releases are always "done" once created
elif subject_type == SubjectType.DISCUSSION.value:
return (
data.get("state") == "closed"
or data.get("answer_chosen_at") is not None
)
# For other types, we can't determine if they're "done"
return False
except requests.exceptions.RequestException as e:
logger.error(f"Error checking status for {api_url}: {e}")
return False
def delete_notification(self, thread_id: str) -> bool:
"""
Delete a notification (mark as done).
Args:
thread_id: The notification thread ID
Returns:
True if successfully deleted
"""
if self.dry_run:
logger.info(f"[DRY RUN] Would delete notification {thread_id}")
return True
url = f"{self.GITHUB_API_URL}/notifications/threads/{thread_id}"
try:
response = self.session.delete(url)
if response.status_code in [204, 205]: # 204 No Content, 205 Reset Content
logger.info(f"✅ Deleted notification {thread_id}")
return True
else:
logger.error(
f"❌ Failed to delete {thread_id} (HTTP {response.status_code})"
)
return False
except requests.exceptions.RequestException as e:
logger.error(f"Error deleting notification {thread_id}: {e}")
return False
def process_notification(self, notification: Dict) -> bool:
"""
Process a single notification (check if closed/merged, then delete).
Args:
notification: Notification dictionary
Returns:
True if notification was deleted
"""
thread_id = notification["id"]
title = notification["subject"]["title"]
subject_type = notification["subject"].get("type", "Unknown")
logger.debug(f"Processing {subject_type}: {title[:50]}...")
try:
if self.is_closed_or_merged(notification):
if self.delete_notification(thread_id):
self.stats.deleted += 1
return True
else:
self.stats.errors += 1
else:
self.stats.skipped += 1
logger.debug(f"Skipped {thread_id} (still open)")
except Exception as e:
logger.error(f"Unexpected error processing {thread_id}: {e}")
self.stats.errors += 1
self.stats.processed += 1
return False
def cleanup_notifications(self, notifications: List[Dict]) -> NotificationStats:
"""
Process all notifications in parallel.
Args:
notifications: List of notification dictionaries
Returns:
Statistics about the cleanup operation
"""
self.stats.total = len(notifications)
if not notifications:
logger.info("No notifications to process")
return self.stats
logger.info(
f"Processing {len(notifications)} notifications with {self.max_workers} workers..."
)
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {
executor.submit(self.process_notification, notif): notif
for notif in notifications
}
# Process with progress updates
for i, future in enumerate(as_completed(futures), 1):
try:
future.result()
if i % 10 == 0: # Progress update every 10 notifications
logger.info(f"Progress: {i}/{len(notifications)} processed")
except Exception as e:
logger.error(f"Worker exception: {e}")
self.stats.errors += 1
return self.stats
def print_summary(self) -> None:
"""Print a summary of the cleanup operation"""
logger.info("=" * 50)
logger.info("CLEANUP SUMMARY")
logger.info("=" * 50)
logger.info(f"Total notifications found: {self.stats.total}")
logger.info(f"Notifications processed: {self.stats.processed}")
logger.info(f"Notifications deleted: {self.stats.deleted}")
logger.info(f"Notifications skipped (still open): {self.stats.skipped}")
logger.info(f"Errors encountered: {self.stats.errors}")
if self.dry_run:
logger.info("[DRY RUN MODE - No actual deletions were made]")
def parse_arguments():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(
description="Clean up GitHub notifications for closed/merged issues and PRs"
)
parser.add_argument(
"--token",
help="GitHub API token (or set GITHUB_TOKEN env var)",
default=os.environ.get("GITHUB_TOKEN"),
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Run without actually deleting notifications",
)
parser.add_argument(
"--workers", type=int, default=5, help="Number of parallel workers (default: 5)"
)
parser.add_argument(
"--participating",
action="store_true",
help="Only process notifications where you are directly participating",
)
parser.add_argument(
"--since-days", type=int, help="Only process notifications from the last N days"
)
parser.add_argument("--verbose", action="store_true", help="Enable verbose logging")
return parser.parse_args()
def main():
"""Main entry point"""
args = parse_arguments()
# Set logging level
if args.verbose:
logger.setLevel(logging.DEBUG)
# Validate token
if not args.token:
logger.error(
"Missing GitHub token. Set GITHUB_TOKEN environment variable or use --token"
)
sys.exit(1)
# Calculate since date if specified
since = None
if args.since_days:
since = datetime.now() - timedelta(days=args.since_days)
logger.info(f"Processing notifications since {since.date()}")
# Create cleaner instance
cleaner = GitHubNotificationCleaner(
token=args.token, dry_run=args.dry_run, max_workers=args.workers
)
try:
# Fetch notifications
notifications = cleaner.get_all_notifications(
only_participating=args.participating, since=since
)
# Process notifications
if notifications:
cleaner.cleanup_notifications(notifications)
# Print summary
cleaner.print_summary()
except KeyboardInterrupt:
logger.info("\nOperation cancelled by user")
cleaner.print_summary()
sys.exit(1)
except Exception as e:
logger.error(f"Unexpected error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
@keif
Copy link
Author

keif commented Aug 5, 2025

Basic usage

python mark_as_done.py

Dry run to see what would be deleted

python mark_as_done.py --dry-run

Only process notifications from last 7 days

python mark_as_done.py --since-days 7

Only process notifications where you're directly participating

python mark_as_done.py --participating

Verbose mode for debugging

python mark_as_done.py --verbose

Adjust parallel workers for rate limit concerns

python mark_as_done.py --workers 3

@keif
Copy link
Author

keif commented Aug 5, 2025

Rewrote this gist.

  1. Better Error Handling & Resilience
  • Added automatic retry logic for transient failures (429, 5xx errors)
  • Handles 404 errors gracefully (deleted issues/PRs)
  • Uses a session with connection pooling for better performance
  • Increased exception handling
  1. Rate Limiting Protection
  • Monitors GitHub API rate limits before making requests
  • Automatically waits when rate limit is exceeded
  • Configurable threshold to warn before hitting limits
  1. Enhanced Functionality
  • Dry run mode: Test without actually deleting notifications
  • Filtering options: Process only participating notifications or recent ones
  • More notification types: Handles Releases, Discussions, etc.
  • Progress tracking: Shows progress for large batches
  1. Improved Logging
  • Structured logging with timestamps
  • Multiple log levels (DEBUG, INFO, WARNING, ERROR)
  • Verbose mode for debugging
  • Clear summary statistics at the end
  1. Command-Line Interface
  • added usage examples

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment