Skip to content

Instantly share code, notes, and snippets.

@dr2050
Created June 4, 2026 14:47
Show Gist options
  • Select an option

  • Save dr2050/eaea92435ac12bb4572e6560183a71ad to your computer and use it in GitHub Desktop.

Select an option

Save dr2050/eaea92435ac12bb4572e6560183a71ad to your computer and use it in GitHub Desktop.
Clean up your codex history in current repo. It's safe and prompts you clearly. Keeps Codex app clean.
#!/usr/bin/env python3
import argparse
import json
import os
import re
import sqlite3
import sys
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
SESSION_ID_RE = re.compile(
r"rollout-\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}-([0-9a-f-]{36})\.jsonl$"
)
@dataclass(frozen=True)
class Session:
path: Path
session_id: str
cwd: Path
mtime_ns: int
title: str | None = None
thread_name: str | None = None
first_user_message: str | None = None
def parse_args():
parser = argparse.ArgumentParser(
description=(
"Delete Codex session files for a directory except the newest one "
"by file modification time."
)
)
parser.add_argument(
"--dir",
default=".",
help="Project directory whose Codex sessions should be cleaned. Defaults to $PWD.",
)
parser.add_argument(
"--codex-home",
default=os.environ.get("CODEX_HOME", "~/.codex"),
help="Codex home directory. Defaults to $CODEX_HOME or ~/.codex.",
)
parser.add_argument(
"--keep",
type=int,
default=1,
help="Number of newest matching sessions to keep. Defaults to 1.",
)
parser.add_argument(
"--yes",
action="store_true",
help="Skip the confirmation prompt.",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Print what would be deleted without deleting anything.",
)
parser.add_argument(
"--show-paths",
action="store_true",
help="Show the session JSONL path under each listed session.",
)
parser.add_argument(
"--no-index-update",
action="store_true",
help="Delete session files without removing their entries from session_index.jsonl.",
)
parser.add_argument(
"--no-state-update",
action="store_true",
help="Delete session files without removing their rows from state_5.sqlite.",
)
return parser.parse_args()
def resolve_dir(path):
return Path(path).expanduser().resolve()
def format_mtime_ns(mtime_ns):
return datetime.fromtimestamp(mtime_ns / 1_000_000_000).strftime("%Y-%m-%d %H:%M")
def clean_label(value):
if not value:
return None
value = " ".join(str(value).split())
if len(value) > 96:
value = value[:93].rstrip() + "..."
return value
def session_id_from_path(path):
match = SESSION_ID_RE.match(path.name)
if match:
return match.group(1)
return path.stem
def read_session_metadata(path):
session_id = session_id_from_path(path)
cwd = None
thread_name = None
first_user_message = None
try:
with path.open("r", encoding="utf-8") as handle:
for line in handle:
try:
record = json.loads(line)
except json.JSONDecodeError:
continue
payload = record.get("payload")
if not isinstance(payload, dict):
continue
if payload.get("id"):
session_id = payload["id"]
if payload.get("cwd"):
cwd = resolve_dir(payload["cwd"])
payload_type = payload.get("type")
if payload_type == "thread_name_updated" and payload.get("thread_name"):
thread_name = clean_label(payload["thread_name"])
if payload_type == "user_message" and not first_user_message:
first_user_message = clean_label(payload.get("message"))
if cwd and thread_name and first_user_message:
break
except OSError as error:
print(f"Skipping unreadable session {path}: {error}", file=sys.stderr)
return None
if cwd is None:
return None
stat = path.stat()
return Session(
path=path,
session_id=session_id,
cwd=cwd,
mtime_ns=stat.st_mtime_ns,
thread_name=thread_name,
first_user_message=first_user_message,
)
def read_latest_thread_names(index_path):
names = {}
if not index_path.exists():
return names
with index_path.open("r", encoding="utf-8") as handle:
for line in handle:
try:
record = json.loads(line)
except json.JSONDecodeError:
continue
session_id = record.get("id")
thread_name = record.get("thread_name")
if session_id and thread_name:
names[session_id] = thread_name
return names
def read_thread_state(state_path):
by_id = {}
by_path = {}
if not state_path.exists():
return by_id, by_path
query = "select id, rollout_path, title, first_user_message, preview from threads"
try:
with sqlite3.connect(f"file:{state_path}?mode=ro", uri=True) as connection:
for session_id, rollout_path, title, first_user_message, preview in connection.execute(query):
info = {
"title": clean_label(title),
"first_user_message": clean_label(first_user_message),
"preview": clean_label(preview),
}
if session_id:
by_id[session_id] = info
if rollout_path:
by_path[str(resolve_dir(rollout_path))] = info
except sqlite3.Error as error:
print(f"Could not read Codex state database {state_path}: {error}", file=sys.stderr)
return by_id, by_path
def apply_labels(sessions, thread_names, state_by_id, state_by_path):
return [
Session(
path=session.path,
session_id=session.session_id,
cwd=session.cwd,
mtime_ns=session.mtime_ns,
title=(
state_by_id.get(session.session_id, {}).get("title")
or state_by_path.get(str(session.path), {}).get("title")
),
thread_name=session.thread_name or thread_names.get(session.session_id),
first_user_message=(
state_by_id.get(session.session_id, {}).get("first_user_message")
or state_by_path.get(str(session.path), {}).get("first_user_message")
or session.first_user_message
),
)
for session in sessions
]
def find_sessions(sessions_root, target_dir):
sessions = []
for path in sessions_root.rglob("rollout-*.jsonl"):
session = read_session_metadata(path)
if session and session.cwd == target_dir:
sessions.append(session)
return sessions
def describe(session, show_path=False):
name = session.title or session.thread_name or session.first_user_message
name = f" {name}" if name else ""
line = f" {format_mtime_ns(session.mtime_ns)} {session.session_id[:5]}{name}"
if show_path:
line += f"\n {session.path}"
return line
def remove_deleted_sessions_from_index(index_path, deleted_ids):
if not index_path.exists():
return 0
original = index_path.read_text(encoding="utf-8").splitlines(keepends=True)
kept = []
removed = 0
for line in original:
try:
record = json.loads(line)
except json.JSONDecodeError:
kept.append(line)
continue
if record.get("id") in deleted_ids:
removed += 1
else:
kept.append(line)
if removed:
backup = index_path.with_suffix(index_path.suffix + ".bak")
backup.write_text("".join(original), encoding="utf-8")
index_path.write_text("".join(kept), encoding="utf-8")
return removed
def backup_sqlite_database(source_path):
backup_path = source_path.with_suffix(source_path.suffix + ".bak")
with sqlite3.connect(source_path) as source, sqlite3.connect(backup_path) as backup:
source.backup(backup)
return backup_path
def remove_deleted_sessions_from_state(state_path, deleted_ids, deleted_paths):
if not state_path.exists():
return 0, None
backup_path = backup_sqlite_database(state_path)
placeholders_ids = ",".join("?" for _ in deleted_ids)
placeholders_paths = ",".join("?" for _ in deleted_paths)
conditions = []
params = []
if deleted_ids:
conditions.append(f"id in ({placeholders_ids})")
params.extend(sorted(deleted_ids))
if deleted_paths:
conditions.append(f"rollout_path in ({placeholders_paths})")
params.extend(sorted(deleted_paths))
if not conditions:
return 0, backup_path
query = f"delete from threads where {' or '.join(conditions)}"
with sqlite3.connect(state_path) as connection:
cursor = connection.execute(query, params)
connection.commit()
return cursor.rowcount, backup_path
def main():
args = parse_args()
target_dir = resolve_dir(args.dir)
codex_home = resolve_dir(args.codex_home)
sessions_root = codex_home / "sessions"
index_path = codex_home / "session_index.jsonl"
state_path = codex_home / "state_5.sqlite"
if args.keep < 1:
print("--keep must be at least 1", file=sys.stderr)
return 2
if not target_dir.is_dir():
print(f"Not a directory: {target_dir}", file=sys.stderr)
return 2
if not sessions_root.is_dir():
print(f"Codex sessions directory not found: {sessions_root}", file=sys.stderr)
return 2
state_by_id, state_by_path = read_thread_state(state_path)
thread_names = read_latest_thread_names(index_path)
sessions = apply_labels(
find_sessions(sessions_root, target_dir),
thread_names,
state_by_id,
state_by_path,
)
sessions.sort(key=lambda session: (session.mtime_ns, session.path.name), reverse=True)
kept = sessions[: args.keep]
doomed = sessions[args.keep :]
print(f"Project directory: {target_dir}")
print(f"Codex sessions: {sessions_root}")
if not sessions:
print("No Codex sessions found for this directory.")
return 0
print("\nKeeping:")
for session in kept:
print(describe(session, args.show_paths))
if not doomed:
print("\nNothing to delete.")
return 0
print("\nDeleting:")
for session in doomed:
print(describe(session, args.show_paths))
if args.dry_run:
print("\nDry run only. Nothing deleted.")
return 0
if not args.yes:
answer = input('\nType "destroy" to delete these Codex sessions: ')
if answer != "destroy":
print("Aborted. Nothing deleted.")
return 1
for session in doomed:
session.path.unlink()
deleted_ids = {session.session_id for session in doomed}
deleted_paths = {str(session.path) for session in doomed}
removed_index_entries = 0
if not args.no_index_update:
removed_index_entries = remove_deleted_sessions_from_index(
index_path, deleted_ids
)
removed_state_rows = 0
state_backup_path = None
if not args.no_state_update:
removed_state_rows, state_backup_path = remove_deleted_sessions_from_state(
state_path, deleted_ids, deleted_paths
)
print(f"\nDeleted {len(doomed)} session file{'s' if len(doomed) != 1 else ''}.")
if not args.no_index_update:
print(f"Removed {removed_index_entries} session_index.jsonl entr{'ies' if removed_index_entries != 1 else 'y'}.")
if removed_index_entries:
print(f"Index backup: {index_path}.bak")
if not args.no_state_update:
print(f"Removed {removed_state_rows} state_5.sqlite thread row{'s' if removed_state_rows != 1 else ''}.")
if state_backup_path:
print(f"State backup: {state_backup_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment