|
#!/usr/bin/env python3 |
|
""" |
|
Single-file dotfiles installer. |
|
|
|
Usage: python3 install.py [--dry_run] |
|
|
|
Reads dotfiles.toml from the same directory and performs two actions: |
|
|
|
1. Links — creates symlinks from destination paths (supporting ~ and $VAR |
|
expansion) to source files inside the repository. Existing regular files |
|
or directories at the destination are timestamped and moved aside before |
|
the symlink is created. |
|
|
|
2. Packages — installs packages using the first available backend for each |
|
entry. Backend priority (highest to lowest): apt, pacman, paru, flatpak, |
|
shell. Per-package pre/post shell hooks and global pre_install/post_install |
|
hooks are supported. |
|
|
|
Pass --dry_run to print all actions that would be taken without making any |
|
changes to the filesystem. |
|
""" |
|
|
|
from __future__ import annotations |
|
|
|
import argparse |
|
import logging |
|
import os |
|
import shlex |
|
import shutil |
|
import subprocess |
|
import sys |
|
from abc import ABC, abstractmethod |
|
from datetime import datetime |
|
from pathlib import Path |
|
from typing import Any |
|
|
|
try: |
|
import tomllib # type: ignore |
|
except ImportError: # pragma: no cover - tomllib is required on Python 3.11+ |
|
print("ERROR: Python 3.11+ is required (tomllib missing)") |
|
sys.exit(2) |
|
|
|
|
|
LOG_FORMAT = "%(levelname)s: %(message)s" |
|
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class CommandRunner: |
|
"""Executes shell commands with optional sudo elevation and dry-run support.""" |
|
|
|
def __init__(self, dry_run: bool) -> None: |
|
"""Initialise the runner. |
|
|
|
Args: |
|
dry_run: When True, print commands instead of executing them. |
|
""" |
|
self.dry_run = dry_run |
|
try: |
|
self.is_root = os.geteuid() == 0 |
|
except AttributeError: |
|
# os.geteuid is unavailable on non-POSIX platforms |
|
self.is_root = False |
|
|
|
def _format(self, cmd_list: list[str], needs_sudo: bool) -> list[str]: |
|
"""Prepend sudo when the command requires elevated privileges and we are not root.""" |
|
if needs_sudo and not self.is_root: |
|
return ["sudo"] + cmd_list |
|
return cmd_list |
|
|
|
def run_cmd(self, cmd_list: list[str], needs_sudo: bool) -> int: |
|
"""Run a command, returning its exit code. |
|
|
|
In dry-run mode the shell-escaped command is printed and 0 is returned |
|
without executing anything. |
|
""" |
|
full_cmd = self._format(cmd_list, needs_sudo) |
|
cmd_str = " ".join(shlex.quote(str(x)) for x in full_cmd) |
|
if self.dry_run: |
|
print(cmd_str) |
|
return 0 |
|
logger.info("Running command: %s", cmd_str) |
|
proc = subprocess.run( |
|
full_cmd, stdin=sys.stdin, stdout=sys.stdout, stderr=sys.stderr, check=False |
|
) |
|
return proc.returncode |
|
|
|
def run_shell(self, cmd_str: str, needs_sudo: bool) -> int: |
|
"""Run an arbitrary shell string via bash login shell. |
|
|
|
A login shell is used so that user PATH and environment (e.g. nvm, cargo) |
|
are available to hook commands. |
|
""" |
|
return self.run_cmd(["bash", "-lc", cmd_str], needs_sudo) |
|
|
|
|
|
class Installer(ABC): |
|
"""Abstract base for package-manager backends.""" |
|
|
|
name: str |
|
needs_sudo: bool |
|
|
|
@abstractmethod |
|
def is_available(self) -> bool: |
|
"""Return True if this backend's executable is present on the system.""" |
|
|
|
@abstractmethod |
|
def is_installed(self, pkg_spec: str) -> bool: |
|
"""Return True if *pkg_spec* is already installed.""" |
|
|
|
@abstractmethod |
|
def install(self, pkg_spec: str, runner: CommandRunner) -> int: |
|
"""Install *pkg_spec* and return the exit code.""" |
|
|
|
|
|
class AptInstaller(Installer): |
|
"""Backend for Debian/Ubuntu systems using apt-get.""" |
|
|
|
name = "apt" |
|
needs_sudo = True |
|
|
|
def is_available(self) -> bool: |
|
"""Return True if apt-get or apt is on PATH.""" |
|
return shutil.which("apt-get") is not None or shutil.which("apt") is not None |
|
|
|
def is_installed(self, pkg_spec: str) -> bool: |
|
"""Query dpkg for package status.""" |
|
proc = subprocess.run( |
|
["dpkg", "-s", pkg_spec], |
|
stdout=subprocess.DEVNULL, |
|
stderr=subprocess.DEVNULL, |
|
check=False, |
|
) |
|
return proc.returncode == 0 |
|
|
|
def install(self, pkg_spec: str, runner: CommandRunner) -> int: |
|
"""Install via apt-get.""" |
|
return runner.run_cmd(["apt-get", "install", "-y", pkg_spec], self.needs_sudo) |
|
|
|
|
|
class PacmanInstaller(Installer): |
|
"""Backend for Arch Linux using pacman.""" |
|
|
|
name = "pacman" |
|
needs_sudo = True |
|
|
|
def is_available(self) -> bool: |
|
"""Return True if pacman is on PATH.""" |
|
return shutil.which("pacman") is not None |
|
|
|
def is_installed(self, pkg_spec: str) -> bool: |
|
"""Query the local pacman database.""" |
|
proc = subprocess.run( |
|
["pacman", "-Q", pkg_spec], |
|
stdout=subprocess.DEVNULL, |
|
stderr=subprocess.DEVNULL, |
|
check=False, |
|
) |
|
return proc.returncode == 0 |
|
|
|
def install(self, pkg_spec: str, runner: CommandRunner) -> int: |
|
"""Install via pacman, skipping already-installed packages.""" |
|
return runner.run_cmd( |
|
["pacman", "-S", "--needed", "--noconfirm", pkg_spec], self.needs_sudo |
|
) |
|
|
|
|
|
class ParuInstaller(Installer): |
|
"""Backend for AUR packages using paru.""" |
|
|
|
name = "paru" |
|
needs_sudo = False |
|
|
|
def is_available(self) -> bool: |
|
"""Return True if paru is on PATH.""" |
|
return shutil.which("paru") is not None |
|
|
|
def is_installed(self, pkg_spec: str) -> bool: |
|
"""Query the local pacman database. |
|
|
|
Paru installs packages into pacman's local database, so pacman -Q is |
|
the authoritative source for both native and AUR packages. |
|
""" |
|
proc = subprocess.run( |
|
["pacman", "-Q", pkg_spec], |
|
stdout=subprocess.DEVNULL, |
|
stderr=subprocess.DEVNULL, |
|
check=False, |
|
) |
|
return proc.returncode == 0 |
|
|
|
def install(self, pkg_spec: str, runner: CommandRunner) -> int: |
|
"""Install via paru.""" |
|
return runner.run_cmd(["paru", "-S", "--noconfirm", pkg_spec], self.needs_sudo) |
|
|
|
|
|
class FlatpakInstaller(Installer): |
|
"""Backend for Flatpak applications.""" |
|
|
|
name = "flatpak" |
|
needs_sudo = False |
|
|
|
def is_available(self) -> bool: |
|
"""Return True if flatpak is on PATH.""" |
|
return shutil.which("flatpak") is not None |
|
|
|
@staticmethod |
|
def _parse(pkg_spec: str) -> tuple[str, str]: |
|
"""Split a *pkg_spec* into ``(remote, ref)``. |
|
|
|
Accepts the optional ``remote:ref`` syntax; defaults to flathub when |
|
no remote is specified. |
|
""" |
|
if ":" in pkg_spec: |
|
remote, ref = pkg_spec.split(":", 1) |
|
else: |
|
remote = "flathub" |
|
ref = pkg_spec |
|
return remote, ref |
|
|
|
def is_installed(self, pkg_spec: str) -> bool: |
|
"""Check whether the Flatpak ref is installed.""" |
|
_, ref = self._parse(pkg_spec) |
|
proc = subprocess.run( |
|
["flatpak", "info", ref], |
|
stdout=subprocess.DEVNULL, |
|
stderr=subprocess.DEVNULL, |
|
check=False, |
|
) |
|
return proc.returncode == 0 |
|
|
|
def install(self, pkg_spec: str, runner: CommandRunner) -> int: |
|
"""Install from the specified remote.""" |
|
remote, ref = self._parse(pkg_spec) |
|
return runner.run_cmd( |
|
["flatpak", "install", "-y", remote, ref], self.needs_sudo |
|
) |
|
|
|
|
|
class ShellInstaller(Installer): |
|
"""Fallback backend that runs an arbitrary shell command as the install step.""" |
|
|
|
name = "shell" |
|
needs_sudo = False |
|
|
|
def is_available(self) -> bool: |
|
"""Always available — bash is assumed to be present.""" |
|
return True |
|
|
|
def is_installed(self, pkg_spec: str) -> bool: |
|
"""Always returns False. |
|
|
|
Shell commands have no queryable installed state, so the command is |
|
always re-run. Write idempotent scripts or use pre/post hooks to avoid |
|
redundant side effects. |
|
""" |
|
return False |
|
|
|
def install(self, pkg_spec: str, runner: CommandRunner) -> int: |
|
"""Execute *pkg_spec* as a shell command.""" |
|
return runner.run_shell(pkg_spec, self.needs_sudo) |
|
|
|
|
|
def load_config(repo_root: Path) -> dict[str, Any]: |
|
"""Load and return the parsed dotfiles.toml configuration. |
|
|
|
Exits with code 2 on missing file or parse error. |
|
""" |
|
cfg_path = repo_root / "dotfiles.toml" |
|
if not cfg_path.exists(): |
|
logger.error("dotfiles.toml not found in repo root: %s", cfg_path) |
|
sys.exit(2) |
|
try: |
|
data = tomllib.loads(cfg_path.read_text()) |
|
except Exception as e: |
|
logger.error("Failed to parse dotfiles.toml: %s", e) |
|
sys.exit(2) |
|
return data |
|
|
|
|
|
def validate_config(cfg: dict[str, Any], repo_root: Path) -> None: |
|
"""Validate the configuration, exiting with code 2 on any error. |
|
|
|
Checks that every link entry has required keys and that its source path |
|
exists inside the repository. |
|
""" |
|
for i, link in enumerate(cfg.get("links", [])): |
|
if not isinstance(link, dict): |
|
logger.error("Invalid links entry at index %d: expected a table", i) |
|
sys.exit(2) |
|
if "src" not in link: |
|
logger.error("links[%d] missing 'src'", i) |
|
sys.exit(2) |
|
if "dst" not in link: |
|
logger.error("links[%d] missing 'dst'", i) |
|
sys.exit(2) |
|
src = link["src"] |
|
try: |
|
src_path = (repo_root / src).resolve(strict=True) |
|
except Exception: |
|
logger.error("links[%d] src does not exist: %s", i, src) |
|
sys.exit(2) |
|
try: |
|
if not src_path.is_relative_to(repo_root): |
|
logger.error("links[%d] src is outside repo: %s", i, src) |
|
sys.exit(2) |
|
except Exception: |
|
# Path.is_relative_to may raise on some invalid states |
|
logger.error("links[%d] src is outside repo: %s", i, src) |
|
sys.exit(2) |
|
|
|
|
|
def ensure_parent(dst_path: Path, dry_run: bool) -> None: |
|
"""Create the parent directory of *dst_path* if it does not exist.""" |
|
parent = dst_path.parent |
|
if not parent.exists(): |
|
if dry_run: |
|
logger.info("Would create parent directory: %s", parent) |
|
else: |
|
logger.info("Creating parent directory: %s", parent) |
|
parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
def process_links( |
|
cfg: dict[str, Any], repo_root: Path, dry_run: bool, summary: dict[str, int] |
|
) -> None: |
|
"""Create or repair symlinks declared in the config. |
|
|
|
Existing regular files/directories at the destination are timestamped and |
|
moved aside before the symlink is created. |
|
""" |
|
for link in cfg.get("links", []): |
|
src = link["src"] |
|
dst = link["dst"] |
|
src_path = (repo_root / src).resolve() |
|
dst_expanded = os.path.expanduser(os.path.expandvars(dst)) |
|
dst_path = Path(dst_expanded).absolute() |
|
|
|
ensure_parent(dst_path, dry_run) |
|
|
|
if not dst_path.exists(follow_symlinks=False): |
|
if dry_run: |
|
logger.info("Would symlink %s -> %s", dst_path, src_path) |
|
else: |
|
logger.info("Creating symlink %s -> %s", dst_path, src_path) |
|
try: |
|
dst_path.symlink_to(src_path) |
|
except Exception as e: |
|
logger.error( |
|
"Failed to create symlink %s -> %s: %s", dst_path, src_path, e |
|
) |
|
sys.exit(1) |
|
summary["links_created"] += 1 |
|
continue |
|
|
|
if dst_path.is_symlink(): |
|
try: |
|
target = dst_path.resolve() |
|
except Exception: |
|
target = None |
|
if target == src_path: |
|
logger.info("Symlink already correct for %s", dst_path) |
|
summary["links_skipped"] += 1 |
|
continue |
|
if dry_run: |
|
logger.info( |
|
"Would replace symlink %s -> %s (was -> %s)", |
|
dst_path, |
|
src_path, |
|
target, |
|
) |
|
else: |
|
logger.info( |
|
"Replacing symlink %s (was -> %s) -> %s", |
|
dst_path, |
|
target, |
|
src_path, |
|
) |
|
try: |
|
dst_path.unlink() |
|
dst_path.symlink_to(src_path) |
|
except Exception as e: |
|
logger.error("Failed replacing symlink %s: %s", dst_path, e) |
|
sys.exit(1) |
|
summary["links_created"] += 1 |
|
continue |
|
|
|
# dst exists and is not a symlink — back it up before overwriting |
|
timestamp = datetime.now().strftime("%Y%m%d%H%M%S") |
|
backup_name = dst_path.with_name(dst_path.name + "." + timestamp + ".bak") |
|
if dry_run: |
|
logger.warning("Would backup %s -> %s", dst_path, backup_name) |
|
logger.info("Would symlink %s -> %s", dst_path, src_path) |
|
else: |
|
logger.warning("Backing up %s -> %s", dst_path, backup_name) |
|
try: |
|
# shutil.move handles cross-device moves unlike os.rename |
|
shutil.move(str(dst_path), str(backup_name)) |
|
except Exception as e: |
|
logger.error("Failed to backup %s -> %s: %s", dst_path, backup_name, e) |
|
sys.exit(1) |
|
try: |
|
dst_path.symlink_to(src_path) |
|
except Exception as e: |
|
logger.error( |
|
"Failed to create symlink %s -> %s: %s", dst_path, src_path, e |
|
) |
|
sys.exit(1) |
|
summary["backups_made"] += 1 |
|
summary["links_created"] += 1 |
|
|
|
|
|
def process_packages( |
|
cfg: dict[str, Any], |
|
installers: list[Installer], |
|
runner: CommandRunner, |
|
summary: dict[str, int], |
|
) -> None: |
|
"""Install packages declared in the config using the first available backend. |
|
|
|
Installer priority is determined by the order of *installers*. |
|
""" |
|
pkgs = cfg.get("packages", {}) or {} |
|
for pkg_id, ptab in pkgs.items(): |
|
logger.info("Processing package: %s", pkg_id) |
|
if not isinstance(ptab, dict): |
|
logger.warning("Package %s: invalid table, skipping", pkg_id) |
|
continue |
|
|
|
# Walk the ordered list to find the highest-priority available backend |
|
selected: Installer | None = None |
|
pkg_spec: str | None = None |
|
for inst in installers: |
|
if inst.name in ptab and inst.is_available(): |
|
selected = inst |
|
pkg_spec = ptab[inst.name] |
|
break |
|
|
|
if selected is None or pkg_spec is None: |
|
logger.warning("No usable installer for package %s, skipping", pkg_id) |
|
summary["packages_skipped"] += 1 |
|
continue |
|
|
|
for cmd in ptab.get("pre", []): |
|
logger.info("Running pre hook for %s: %s", pkg_id, cmd) |
|
rc = runner.run_shell(cmd, needs_sudo=False) |
|
if rc != 0: |
|
logger.error("Pre-hook failed for %s: %s", pkg_id, cmd) |
|
sys.exit(rc) |
|
|
|
try: |
|
installed = selected.is_installed(str(pkg_spec)) |
|
except Exception as e: |
|
logger.error("Error checking installation state for %s: %s", pkg_id, e) |
|
sys.exit(1) |
|
|
|
if installed: |
|
logger.info("Package %s already installed (%s)", pkg_id, pkg_spec) |
|
summary["packages_already_installed"] += 1 |
|
else: |
|
logger.info("Installing %s using %s: %s", pkg_id, selected.name, pkg_spec) |
|
rc = selected.install(str(pkg_spec), runner) |
|
if rc != 0: |
|
logger.error( |
|
"Installer %s failed for %s (rc=%d)", selected.name, pkg_id, rc |
|
) |
|
sys.exit(rc) |
|
summary["packages_installed"] += 1 |
|
|
|
for cmd in ptab.get("post", []): |
|
logger.info("Running post hook for %s: %s", pkg_id, cmd) |
|
rc = runner.run_shell(cmd, needs_sudo=False) |
|
if rc != 0: |
|
logger.error("Post-hook failed for %s: %s", pkg_id, cmd) |
|
sys.exit(rc) |
|
|
|
|
|
def run_hooks(hooks: list[str] | None, runner: CommandRunner, name: str) -> None: |
|
"""Run a list of shell hook commands, exiting on the first failure.""" |
|
if not hooks: |
|
return |
|
for cmd in hooks: |
|
logger.info("Running %s hook: %s", name, cmd) |
|
rc = runner.run_shell(cmd, needs_sudo=False) |
|
if rc != 0: |
|
logger.error("Global %s hook failed: %s", name, cmd) |
|
sys.exit(rc) |
|
|
|
|
|
def print_summary(summary: dict[str, int]) -> None: |
|
"""Log a human-readable summary of actions taken.""" |
|
logger.info("Summary:") |
|
logger.info(" links_created: %d", summary.get("links_created", 0)) |
|
logger.info(" links_skipped: %d", summary.get("links_skipped", 0)) |
|
logger.info(" backups_made: %d", summary.get("backups_made", 0)) |
|
logger.info(" packages_installed: %d", summary.get("packages_installed", 0)) |
|
logger.info( |
|
" packages_already_installed: %d", summary.get("packages_already_installed", 0) |
|
) |
|
logger.info(" packages_skipped: %d", summary.get("packages_skipped", 0)) |
|
|
|
|
|
def main() -> int: |
|
"""Entry point — parse arguments, load config, and run the install pipeline.""" |
|
parser = argparse.ArgumentParser() |
|
parser.add_argument( |
|
"--dry_run", action="store_true", help="Print plan only, do not perform changes" |
|
) |
|
args = parser.parse_args() |
|
|
|
dry_run = args.dry_run |
|
repo_root = Path(__file__).resolve().parent |
|
|
|
cfg = load_config(repo_root) |
|
validate_config(cfg, repo_root) |
|
|
|
runner = CommandRunner(dry_run=dry_run) |
|
|
|
# Order determines priority: the first available backend wins for each package |
|
installers: list[Installer] = [ |
|
AptInstaller(), |
|
PacmanInstaller(), |
|
ParuInstaller(), |
|
FlatpakInstaller(), |
|
ShellInstaller(), |
|
] |
|
|
|
summary: dict[str, int] = { |
|
"links_created": 0, |
|
"links_skipped": 0, |
|
"backups_made": 0, |
|
"packages_installed": 0, |
|
"packages_already_installed": 0, |
|
"packages_skipped": 0, |
|
} |
|
|
|
pre = ( |
|
cfg.get("pre_install", {}).get("commands") |
|
if isinstance(cfg.get("pre_install"), dict) |
|
else None |
|
) |
|
run_hooks(pre, runner, "pre_install") |
|
|
|
process_links(cfg, repo_root, dry_run, summary) |
|
process_packages(cfg, installers, runner, summary) |
|
|
|
post = ( |
|
cfg.get("post_install", {}).get("commands") |
|
if isinstance(cfg.get("post_install"), dict) |
|
else None |
|
) |
|
run_hooks(post, runner, "post_install") |
|
|
|
print_summary(summary) |
|
|
|
return 0 |
|
|
|
|
|
if __name__ == "__main__": |
|
try: |
|
rc = main() |
|
except SystemExit: |
|
raise |
|
except Exception as e: |
|
logger.error("Uncaught error: %s", e) |
|
sys.exit(1) |
|
sys.exit(0) |