Skip to content

Instantly share code, notes, and snippets.

@khaneliman
Created April 16, 2026 20:15
Show Gist options
  • Select an option

  • Save khaneliman/926cb8981bb1ce2716d2c718b628cbe7 to your computer and use it in GitHub Desktop.

Select an option

Save khaneliman/926cb8981bb1ce2716d2c718b628cbe7 to your computer and use it in GitHub Desktop.
Local script for analyzing vim/kakoune plugin release lag
#!/usr/bin/env nix-shell
#!nix-shell -i python3 -p python3 python3Packages.gitpython python3Packages.packaging
import argparse
import csv
import importlib
import os
import sys
from contextlib import nullcontext
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from multiprocessing.dummy import Pool
from pathlib import Path
from typing import TextIO
SCRIPT_ROOT = Path(__file__).resolve().parents[1]
REPO_ROOT = Path(__file__).resolve().parents[6]
sys.path.insert(0, str(SCRIPT_ROOT / "src"))
npu = importlib.import_module("nixpkgs_plugin_update")
@dataclass(frozen=True)
class EditorConfig:
name: str
input_file: Path
get_plugins: str
@dataclass(frozen=True)
class CurrentPlugin:
name: str
version: str
commit: str
tag: str | None
@dataclass(frozen=True)
class ReportRow:
name: str
mode: str
repo: str
current_version: str
current_ref: str
latest_tag: str | None
release_version: str | None
release_ref: str | None
release_date: str | None
current_date: str | None
head_ref: str
head_date: str
current_vs_release_days: int | None
head_vs_release_days: int | None
head_vs_current_days: int | None
nixpkgs_vs_release_days: int | None
status: str
recommendation: str
TABLE_HEADERS = [
"name",
"mode",
"status",
"recommendation",
"current_version",
"latest_tag",
"release_date",
"head_date",
"head_vs_release_days",
]
MARKDOWN_HEADERS = [
"Plugin",
"Mode",
"Status",
"Recommendation",
"Latest Tag",
"Release Date",
"In Nixpkgs Date",
"Nixpkgs ahead (days)",
"HEAD Date",
"HEAD ahead (days)",
]
RELEASE_RECOMMENDATIONS = {
"Keep release",
"Keep release (no tags)",
"Keep release (unusable tag)",
}
def build_kakoune_get_plugins_expr() -> str:
root = REPO_ROOT / "pkgs/applications/editors/kakoune/plugins"
return f"""with import <localpkgs> {{ }};
let
inherit (kakouneUtils.override {{ }}) buildKakounePluginFrom2Nix;
generated = callPackage {root}/generated.nix {{ inherit buildKakounePluginFrom2Nix; }};
hasChecksum =
value:
lib.isAttrs value
&& lib.hasAttrByPath [
"src"
"outputHash"
] value;
parse = _name: value: {{
pname = value.pname;
version = value.version;
homePage = value.meta.homepage or null;
checksum =
if hasChecksum value then
{{
submodules = value.src.fetchSubmodules or false;
sha256 = value.src.outputHash;
rev = value.src.rev;
tag = value.src.tag or null;
}}
else
null;
}};
in
lib.mapAttrs parse (removeAttrs generated [ "__unfix__" "override" "overrideDerivation" ])"""
def get_editor_config(editor: str) -> EditorConfig:
if editor == "vim":
return EditorConfig(
name="vim",
input_file=REPO_ROOT
/ "pkgs/applications/editors/vim/plugins/vim-plugin-names",
get_plugins=(
REPO_ROOT / "pkgs/applications/editors/vim/plugins/utils/get-plugins.nix"
).read_text(),
)
if editor == "kakoune":
return EditorConfig(
name="kakoune",
input_file=REPO_ROOT
/ "pkgs/applications/editors/kakoune/plugins/kakoune-plugin-names",
get_plugins=build_kakoune_get_plugins_expr(),
)
def load_current_plugins(
config: npu.FetchConfig, editor_config: EditorConfig, nixpkgs: str
) -> dict[str, CurrentPlugin]:
data = npu.run_nix_expr(editor_config.get_plugins, nixpkgs)
current_plugins = {}
for attr in data.values():
checksum = attr["checksum"]
if checksum is None:
continue
name = attr["pname"].replace(".", "-")
current_plugins[name] = CurrentPlugin(
name=name,
version=attr["version"],
commit=checksum["rev"],
tag=checksum.get("tag"),
)
return current_plugins
def normalize_datetime(value: datetime | None) -> datetime | None:
if value is None:
return None
if value.tzinfo is None:
return value.replace(tzinfo=timezone.utc)
return value.astimezone(timezone.utc)
def days_between(newer: datetime | None, older: datetime | None) -> int | None:
if newer is None or older is None:
return None
normalized_newer = normalize_datetime(newer)
normalized_older = normalize_datetime(older)
assert normalized_newer is not None and normalized_older is not None
return (normalized_newer.date() - normalized_older.date()).days
def iso_date(value: datetime | None) -> str | None:
if value is None:
return None
normalized = normalize_datetime(value)
assert normalized is not None
return normalized.date().isoformat()
def current_mode(branch: str) -> str:
if branch == "":
return "release"
if branch == "HEAD":
return "head"
return "explicit"
def classify_plugin(
mode: str,
latest_tag: str | None,
release_version: str | None,
head_vs_release_days: int | None,
) -> tuple[str, str]:
if latest_tag is None:
return "no-tag", "Keep release (no tags)"
if release_version is None:
return "unusable-tag", "Keep release (unusable tag)"
if mode == "head":
if head_vs_release_days == 0:
return "head", "Move to Release (up-to-date)"
if head_vs_release_days is not None and head_vs_release_days < 60:
return "head", "Consider Release (recent tag)"
return "head", "Keep HEAD"
if mode == "explicit":
return "explicit", "Keep explicit"
if head_vs_release_days == 0:
return "current-release", "Keep release"
status = (
"stale-release"
if head_vs_release_days is not None and head_vs_release_days > 365
else "recent-release"
)
if head_vs_release_days is None:
return status, "Keep"
if head_vs_release_days > 365:
return status, "Pin HEAD (stale release)"
if head_vs_release_days > 180:
return status, "Check HEAD"
return status, "Keep release"
def analyze_plugin(
plugin_desc: npu.PluginDesc,
current_plugins: dict[str, CurrentPlugin],
) -> ReportRow | None:
name = plugin_desc.name.replace(".", "-")
current = current_plugins.get(name)
if current is None:
return None
mode = current_mode(plugin_desc.branch)
head_ref, head_date = plugin_desc.repo.latest_commit()
latest_tag = plugin_desc.repo.get_latest_tag()
release_version = (
npu.normalize_release_version(latest_tag) if latest_tag is not None else None
)
release_ref = None
release_date = None
if latest_tag is not None and release_version is not None:
release_ref, release_date = plugin_desc.repo.resolve_tag(latest_tag)
current_date, _ = npu.Plugin.parse_version_string(current.version)
if current_date is None and current.tag is not None and current.tag == latest_tag:
current_date = release_date
head_vs_release_days = days_between(head_date, release_date)
current_vs_release_days = days_between(current_date, release_date)
head_vs_current_days = days_between(head_date, current_date)
nixpkgs_vs_release_days = days_between(current_date, release_date)
status, recommendation = classify_plugin(
mode=mode,
latest_tag=latest_tag,
release_version=release_version,
head_vs_release_days=head_vs_release_days,
)
return ReportRow(
name=name,
mode=mode,
repo=plugin_desc.repo.uri,
current_version=current.version,
current_ref=current.tag or current.commit,
latest_tag=latest_tag,
release_version=release_version,
release_ref=release_ref,
release_date=iso_date(release_date),
current_date=iso_date(current_date),
head_ref=head_ref,
head_date=iso_date(head_date) or "",
current_vs_release_days=current_vs_release_days,
head_vs_release_days=head_vs_release_days,
head_vs_current_days=head_vs_current_days,
nixpkgs_vs_release_days=nixpkgs_vs_release_days,
status=status,
recommendation=recommendation,
)
def analyze_plugin_safe(
plugin_desc: npu.PluginDesc,
current_plugins: dict[str, CurrentPlugin],
) -> ReportRow | None:
try:
return analyze_plugin(plugin_desc, current_plugins)
except Exception as exc:
name = plugin_desc.name.replace(".", "-")
current = current_plugins.get(name)
current_ref = ""
current_version = ""
if current is not None:
current_ref = current.tag or current.commit
current_version = current.version
return ReportRow(
name=name,
mode=current_mode(plugin_desc.branch),
repo=plugin_desc.repo.uri,
current_version=current_version,
current_ref=current_ref,
latest_tag=None,
release_version=None,
release_ref=None,
release_date=None,
current_date=None,
head_ref="",
head_date="",
current_vs_release_days=None,
head_vs_release_days=None,
head_vs_current_days=None,
nixpkgs_vs_release_days=None,
status=f"error:{exc.__class__.__name__}",
recommendation="Error",
)
def format_cell(value: str | int | None) -> str:
return "" if value is None else str(value)
def print_table(rows: list[ReportRow], output: TextIO) -> None:
table = [
[
row.name,
row.mode,
row.status,
row.recommendation,
row.current_version,
row.latest_tag,
row.release_date,
row.head_date,
row.head_vs_release_days,
]
for row in rows
]
widths = [
max(len(header), *(len(format_cell(row[idx])) for row in table))
for idx, header in enumerate(TABLE_HEADERS)
]
print(
" ".join(header.ljust(widths[idx]) for idx, header in enumerate(TABLE_HEADERS)),
file=output,
)
print(" ".join("-" * width for width in widths), file=output)
for row in table:
print(
" ".join(
format_cell(cell).ljust(widths[idx]) for idx, cell in enumerate(row)
),
file=output,
)
def print_markdown(rows: list[ReportRow], output: TextIO) -> None:
print("| " + " | ".join(MARKDOWN_HEADERS) + " |", file=output)
print("| " + " | ".join("---" for _ in MARKDOWN_HEADERS) + " |", file=output)
for row in rows:
cells = [
f"[{row.name}]({row.repo})",
row.mode,
row.status,
row.recommendation,
f"`{row.latest_tag}`" if row.latest_tag else "",
row.release_date or "",
row.current_date or "",
format_cell(row.nixpkgs_vs_release_days),
row.head_date or "",
format_cell(row.head_vs_release_days),
]
print("| " + " | ".join(cells) + " |", file=output)
def print_csv(rows: list[ReportRow], output: TextIO) -> None:
writer = csv.DictWriter(output, fieldnames=list(ReportRow.__annotations__))
writer.writeheader()
for row in rows:
writer.writerow(asdict(row))
def collect_desired_state(rows: list[ReportRow]) -> dict[str, str]:
desired_state = {}
for row in rows:
if "Pin HEAD" in row.recommendation or "REGRESSION RISK" in row.recommendation:
desired_state[row.repo] = "HEAD"
elif (
row.recommendation in RELEASE_RECOMMENDATIONS
or "Move to Release" in row.recommendation
or "Consider Release" in row.recommendation
):
desired_state[row.repo] = ""
return desired_state
def write_plugin_names(
input_file: Path,
plugin_descs: list[npu.PluginDesc],
) -> None:
with open(input_file, "w", newline="") as handle:
writer = csv.DictWriter(
handle,
fieldnames=["repo", "branch", "alias"],
dialect="unix",
quoting=csv.QUOTE_NONE,
)
writer.writeheader()
for plugin in sorted(plugin_descs, key=lambda x: x.name):
writer.writerow(
{
"repo": plugin.repo.uri,
"branch": plugin.branch,
"alias": plugin.alias or "",
}
)
def update_plugin_names(
editor_config: EditorConfig,
rows: list[ReportRow],
fetch_config: npu.FetchConfig,
) -> None:
desired_state = collect_desired_state(rows)
if not desired_state:
print("No changes recommended for plugin names file.", file=sys.stderr)
return
plugin_descs = npu.load_plugins_from_csv(fetch_config, editor_config.input_file)
updated_descs = []
updated_count = 0
for plugin_desc in plugin_descs:
new_branch = desired_state.get(plugin_desc.repo.uri)
if new_branch is not None and plugin_desc.branch != new_branch:
plugin_desc = npu.PluginDesc(
plugin_desc.repo,
new_branch,
plugin_desc.alias,
)
updated_count += 1
updated_descs.append(plugin_desc)
if updated_count == 0:
print("No changes needed in plugin names file.", file=sys.stderr)
return
write_plugin_names(editor_config.input_file, updated_descs)
print(
f"Successfully updated {updated_count} entries in {editor_config.input_file.name}.",
file=sys.stderr,
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=(
"Report how stale release-tracked plugins are compared to default-branch HEAD."
)
)
parser.add_argument("--editor", choices=["vim", "kakoune"], required=True)
parser.add_argument("--nixpkgs", default=str(REPO_ROOT))
parser.add_argument(
"--github-token",
default=os.getenv("GITHUB_TOKEN"),
help="GitHub token. Defaults to GITHUB_TOKEN from the environment.",
)
parser.add_argument("--proc", type=int, default=16)
parser.add_argument(
"--all",
action="store_true",
help="Include HEAD and explicit-ref plugins in the report.",
)
parser.add_argument(
"--min-days",
type=int,
default=0,
help="Filter out rows with fewer days between release and HEAD.",
)
parser.add_argument("--limit", type=int, default=50)
parser.add_argument(
"--format",
choices=["table", "csv", "markdown"],
default="table",
)
parser.add_argument(
"--output",
type=Path,
help="Write results to this file instead of stdout.",
)
parser.add_argument(
"--update",
action="store_true",
help="Directly update the input CSV file with recommended pinnings.",
)
return parser.parse_args()
def select_plugin_descs(
fetch_config: npu.FetchConfig,
editor_config: EditorConfig,
include_all: bool,
) -> list[npu.PluginDesc]:
plugin_descs = npu.load_plugins_from_csv(fetch_config, editor_config.input_file)
if include_all:
return plugin_descs
return [plugin_desc for plugin_desc in plugin_descs if plugin_desc.branch == ""]
def collect_rows(
plugin_descs: list[npu.PluginDesc],
current_plugins: dict[str, CurrentPlugin],
proc: int,
) -> list[ReportRow]:
with Pool(processes=proc) as pool:
rows = [
row
for row in pool.starmap(
analyze_plugin_safe,
[(plugin_desc, current_plugins) for plugin_desc in plugin_descs],
)
if row is not None
]
rows.sort(
key=lambda row: (
row.head_vs_release_days if row.head_vs_release_days is not None else -1,
row.current_vs_release_days
if row.current_vs_release_days is not None
else -1,
row.name,
),
reverse=True,
)
return rows
def main() -> None:
args = parse_args()
editor_config = get_editor_config(args.editor)
fetch_config = npu.FetchConfig(proc=args.proc, github_token=args.github_token)
if args.github_token is None:
print(
"warning: no GitHub token configured; unauthenticated requests may hit HTTP 429",
file=sys.stderr,
)
plugin_descs = select_plugin_descs(fetch_config, editor_config, args.all)
current_plugins = load_current_plugins(fetch_config, editor_config, args.nixpkgs)
rows = collect_rows(plugin_descs, current_plugins, args.proc)
rows = [
row
for row in rows
if row.head_vs_release_days is None or row.head_vs_release_days >= args.min_days
]
output_rows = rows[: args.limit] if args.limit > 0 else rows
output_context = nullcontext(sys.stdout)
if args.output is not None:
args.output.parent.mkdir(parents=True, exist_ok=True)
output_context = args.output.open("w", newline="")
with output_context as output:
if args.format == "csv":
print_csv(output_rows, output)
elif args.format == "markdown":
print_markdown(output_rows, output)
else:
print_table(output_rows, output)
if args.update:
update_plugin_names(editor_config, rows, fetch_config)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment