Created
January 3, 2026 01:40
-
-
Save DJStompZone/a6ecd1436b6b9daafd324e735b9319a7 to your computer and use it in GitHub Desktop.
Cloudflared Config TUI
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Curses TUI for editing cloudflared config.yml. | |
| Usage: | |
| sudo python3 cftui.py | |
| sudo python3 cftui.py --config /etc/cloudflared/config.yml --backup | |
| sudo python3 cftui.py --dry-run | |
| Recommended: | |
| pip install pyyaml | |
| ©DJ Stomp 2026 | |
| "No Rights Reserved" | |
| License: MIT | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import os | |
| import shutil | |
| import subprocess | |
| import sys | |
| import time | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from typing import Any, Optional | |
| try: | |
| import yaml # type: ignore | |
| except Exception: | |
| yaml = None | |
| CATCH_ALL_SERVICE = "http_status:404" | |
| @dataclass | |
| class IngressRule: | |
| """Represents a single ingress rule in cloudflared config.""" | |
| hostname: Optional[str] = None | |
| service: str = "" | |
| origin_request: dict[str, Any] = field(default_factory=dict) | |
| def is_catch_all(self) -> bool: | |
| return self.hostname is None and self.service.strip() == CATCH_ALL_SERVICE | |
| @dataclass | |
| class CloudflaredConfig: | |
| """In-memory representation of a cloudflared config.yml.""" | |
| tunnel: str = "" | |
| credentials_file: str = "" | |
| ingress: list[IngressRule] = field(default_factory=list) | |
| def normalize(self) -> None: | |
| """Ensure there is exactly one catch-all rule and it is last.""" | |
| non_catch = [r for r in self.ingress if not r.is_catch_all()] | |
| catch = [r for r in self.ingress if r.is_catch_all()] | |
| # Keep exactly one catch-all. | |
| if not catch: | |
| catch = [IngressRule(hostname=None, service=CATCH_ALL_SERVICE)] | |
| else: | |
| catch = [catch[0]] | |
| # Remove any rules missing service. | |
| non_catch = [r for r in non_catch if r.service.strip()] | |
| self.ingress = non_catch + catch | |
| def validate(self) -> list[str]: | |
| """Validate config for common cloudflared mistakes.""" | |
| errors: list[str] = [] | |
| if not self.tunnel.strip(): | |
| errors.append("Missing 'tunnel' value.") | |
| if not self.credentials_file.strip(): | |
| errors.append("Missing 'credentials-file' value.") | |
| if not self.ingress: | |
| errors.append("No ingress rules defined.") | |
| return errors | |
| self.normalize() | |
| if not self.ingress[-1].is_catch_all(): | |
| errors.append("Catch-all rule (service: http_status:404) must be last.") | |
| seen = set() | |
| for i, r in enumerate(self.ingress): | |
| if r.is_catch_all(): | |
| continue | |
| if not r.hostname or not r.hostname.strip(): | |
| errors.append(f"Ingress rule #{i+1} missing hostname.") | |
| if not r.service.strip(): | |
| errors.append(f"Ingress rule #{i+1} missing service.") | |
| key = (r.hostname or "").strip().lower() | |
| if key in seen: | |
| errors.append(f"Duplicate hostname: {r.hostname}") | |
| seen.add(key) | |
| return errors | |
| def to_dict(self) -> dict[str, Any]: | |
| """Convert to a dict suitable for YAML output.""" | |
| self.normalize() | |
| ingress_list: list[dict[str, Any]] = [] | |
| for r in self.ingress: | |
| item: dict[str, Any] = {} | |
| if r.hostname is not None: | |
| item["hostname"] = r.hostname | |
| item["service"] = r.service | |
| if r.origin_request: | |
| item["originRequest"] = r.origin_request | |
| ingress_list.append(item) | |
| return { | |
| "tunnel": self.tunnel, | |
| "credentials-file": self.credentials_file, | |
| "ingress": ingress_list, | |
| } | |
| def _parse_minimal_yaml(text: str) -> CloudflaredConfig: | |
| """ | |
| Minimal parser for common cloudflared YAML structure. | |
| Supports: | |
| tunnel: ... | |
| credentials-file: ... | |
| ingress: | |
| - hostname: ... | |
| service: ... | |
| - service: http_status:404 | |
| If config is more complex, install PyYAML for full fidelity. | |
| """ | |
| cfg = CloudflaredConfig() | |
| lines = [ln.rstrip("\n") for ln in text.splitlines() if ln.strip() and not ln.strip().startswith("#")] | |
| def strip_quotes(s: str) -> str: | |
| s = s.strip() | |
| if (s.startswith('"') and s.endswith('"')) or (s.startswith("'") and s.endswith("'")): | |
| return s[1:-1] | |
| return s | |
| i = 0 | |
| in_ingress = False | |
| current: dict[str, str] = {} | |
| while i < len(lines): | |
| ln = lines[i] | |
| if not ln.startswith(" "): | |
| in_ingress = False | |
| if ":" in ln: | |
| k, v = ln.split(":", 1) | |
| k = k.strip() | |
| v = strip_quotes(v.strip()) | |
| if k == "tunnel": | |
| cfg.tunnel = v | |
| elif k == "credentials-file": | |
| cfg.credentials_file = v | |
| elif k == "ingress": | |
| in_ingress = True | |
| else: | |
| if in_ingress: | |
| s = ln.strip() | |
| if s.startswith("- "): | |
| # Commit previous | |
| if current: | |
| cfg.ingress.append(IngressRule( | |
| hostname=current.get("hostname"), | |
| service=current.get("service", ""), | |
| )) | |
| current = {} | |
| s = s[2:].strip() | |
| if ":" in s: | |
| k, v = s.split(":", 1) | |
| current[k.strip()] = strip_quotes(v.strip()) | |
| else: | |
| if ":" in s: | |
| k, v = s.split(":", 1) | |
| current[k.strip()] = strip_quotes(v.strip()) | |
| i += 1 | |
| if current: | |
| cfg.ingress.append(IngressRule( | |
| hostname=current.get("hostname"), | |
| service=current.get("service", ""), | |
| )) | |
| cfg.normalize() | |
| return cfg | |
| def load_config(path: Path) -> CloudflaredConfig: | |
| """Load config from YAML file, using PyYAML if available.""" | |
| if not path.exists(): | |
| cfg = CloudflaredConfig() | |
| cfg.ingress = [IngressRule(hostname=None, service=CATCH_ALL_SERVICE)] | |
| return cfg | |
| text = path.read_text(encoding="utf-8") | |
| if yaml is not None: | |
| data = yaml.safe_load(text) or {} | |
| cfg = CloudflaredConfig( | |
| tunnel=str(data.get("tunnel", "") or ""), | |
| credentials_file=str(data.get("credentials-file", "") or ""), | |
| ingress=[], | |
| ) | |
| ingress = data.get("ingress", []) or [] | |
| for item in ingress: | |
| if not isinstance(item, dict): | |
| continue | |
| hostname = item.get("hostname") | |
| service = str(item.get("service", "") or "") | |
| origin_request = item.get("originRequest") or {} | |
| cfg.ingress.append(IngressRule( | |
| hostname=str(hostname) if hostname is not None else None, | |
| service=service, | |
| origin_request=origin_request if isinstance(origin_request, dict) else {}, | |
| )) | |
| cfg.normalize() | |
| return cfg | |
| # Fallback minimal parse | |
| return _parse_minimal_yaml(text) | |
| def write_config(path: Path, cfg: CloudflaredConfig, backup: bool) -> None: | |
| """Write config to disk in YAML format.""" | |
| cfg.normalize() | |
| out_dict = cfg.to_dict() | |
| text: str | |
| if yaml is not None: | |
| text = yaml.safe_dump(out_dict, sort_keys=False) | |
| else: | |
| # Minimal YAML writer | |
| def q(s: str) -> str: | |
| if any(ch in s for ch in [":", "#", "{", "}", "[", "]", ",", " "]): | |
| return f'"{s}"' | |
| return s | |
| lines = [] | |
| lines.append(f"tunnel: {q(cfg.tunnel)}") | |
| lines.append(f"credentials-file: {q(cfg.credentials_file)}") | |
| lines.append("ingress:") | |
| for r in cfg.ingress: | |
| lines.append(" - " + (f"hostname: {q(r.hostname)}" if r.hostname else f"service: {q(r.service)}")) | |
| if r.hostname: | |
| lines.append(f" service: {q(r.service)}") | |
| text = "\n".join(lines) + "\n" | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| if backup and path.exists(): | |
| ts = time.strftime("%Y%m%d_%H%M%S") | |
| backup_path = path.with_suffix(path.suffix + f".bak_{ts}") | |
| shutil.copy2(path, backup_path) | |
| path.write_text(text, encoding="utf-8") | |
| def restart_service(service_name: str) -> tuple[bool, str]: | |
| """Restart a systemd service and return (ok, message).""" | |
| try: | |
| p = subprocess.run( | |
| ["systemctl", "restart", service_name], | |
| check=False, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| text=True, | |
| ) | |
| ok = p.returncode == 0 | |
| return ok, p.stdout.strip() | |
| except Exception as e: | |
| return False, str(e) | |
| # ------------------------- Curses UI ------------------------- | |
| def run_tui(cfg_path: Path, cfg: CloudflaredConfig, args: argparse.Namespace) -> int: | |
| import curses # stdlib | |
| state = { | |
| "cfg": cfg, | |
| "selected": 0, | |
| "status": "Ready.", | |
| "dirty": False, | |
| "view": "main", # main | edit_rule | edit_top | |
| "edit_buffer": "", | |
| "edit_field": "", | |
| "edit_rule_idx": None, | |
| } | |
| def set_status(msg: str) -> None: | |
| state["status"] = msg | |
| def mark_dirty() -> None: | |
| state["dirty"] = True | |
| def draw(stdscr: "curses._CursesWindow") -> None: | |
| stdscr.erase() | |
| h, w = stdscr.getmaxyx() | |
| title = f"cloudflared TUI — {cfg_path} " + ("*MODIFIED*" if state["dirty"] else "") | |
| stdscr.addnstr(0, 0, title, w - 1, curses.A_BOLD) | |
| cfg_local: CloudflaredConfig = state["cfg"] | |
| cfg_local.normalize() | |
| stdscr.addnstr(2, 0, f"Tunnel: {cfg_local.tunnel or '[unset]'}", w - 1) | |
| stdscr.addnstr(3, 0, f"Creds: {cfg_local.credentials_file or '[unset]'}", w - 1) | |
| stdscr.addnstr(5, 0, "Ingress rules:", w - 1, curses.A_UNDERLINE) | |
| rules = cfg_local.ingress | |
| top = 6 | |
| for idx, r in enumerate(rules[: max(0, h - top - 4)]): | |
| sel = (idx == state["selected"]) | |
| prefix = "➤ " if sel else " " | |
| if r.is_catch_all(): | |
| line = f"{prefix}[catch-all] service = {r.service}" | |
| attr = curses.A_DIM | |
| else: | |
| line = f"{prefix}{r.hostname:<35} -> {r.service}" | |
| attr = curses.A_REVERSE if sel else curses.A_NORMAL | |
| stdscr.addnstr(top + idx, 0, line, w - 1, attr) | |
| help_lines = [ | |
| "Keys: ↑/↓ select | a add | e edit | d delete | t edit tunnel/creds | v validate | s save | r restart | q quit", | |
| "Tip: Keep fallback.nors.win as a hostname rule so Custom Hostnames stops yelling.", | |
| ] | |
| stdscr.addnstr(h - 3, 0, help_lines[0], w - 1, curses.A_DIM) | |
| stdscr.addnstr(h - 2, 0, help_lines[1], w - 1, curses.A_DIM) | |
| stdscr.addnstr(h - 1, 0, f"Status: {state['status']}", w - 1, curses.A_BOLD) | |
| stdscr.refresh() | |
| def prompt(stdscr: "curses._CursesWindow", title: str, initial: str = "") -> Optional[str]: | |
| import curses | |
| h, w = stdscr.getmaxyx() | |
| win_h = 7 | |
| win_w = min(w - 4, 80) | |
| win_y = (h - win_h) // 2 | |
| win_x = (w - win_w) // 2 | |
| win = curses.newwin(win_h, win_w, win_y, win_x) | |
| win.border() | |
| win.addnstr(1, 2, title, win_w - 4, curses.A_BOLD) | |
| win.addnstr(3, 2, "> " + initial, win_w - 4) | |
| curses.curs_set(1) | |
| buf = list(initial) | |
| while True: | |
| win.erase() | |
| win.border() | |
| win.addnstr(1, 2, title, win_w - 4, curses.A_BOLD) | |
| win.addnstr(2, 2, "Enter to confirm | Esc to cancel", win_w - 4, curses.A_DIM) | |
| win.addnstr(3, 2, "> " + "".join(buf), win_w - 4) | |
| win.move(3, 4 + len(buf)) | |
| win.refresh() | |
| ch = win.getch() | |
| if ch in (10, 13): | |
| curses.curs_set(0) | |
| return "".join(buf).strip() | |
| if ch == 27: | |
| curses.curs_set(0) | |
| return None | |
| if ch in (curses.KEY_BACKSPACE, 127, 8): | |
| if buf: | |
| buf.pop() | |
| elif 32 <= ch <= 126: | |
| buf.append(chr(ch)) | |
| def main(stdscr: "curses._CursesWindow") -> int: | |
| import curses | |
| curses.curs_set(0) | |
| stdscr.nodelay(False) | |
| stdscr.keypad(True) | |
| while True: | |
| draw(stdscr) | |
| ch = stdscr.getch() | |
| cfg_local: CloudflaredConfig = state["cfg"] | |
| cfg_local.normalize() | |
| if ch in (ord("q"), 27): | |
| if state["dirty"]: | |
| ans = prompt(stdscr, "Unsaved changes. Type 'yes' to quit anyway:", "") | |
| if ans != "yes": | |
| set_status("Quit canceled.") | |
| continue | |
| return 0 | |
| if ch == curses.KEY_UP: | |
| state["selected"] = max(0, state["selected"] - 1) | |
| elif ch == curses.KEY_DOWN: | |
| state["selected"] = min(len(cfg_local.ingress) - 1, state["selected"] + 1) | |
| elif ch == ord("a"): | |
| hostname = prompt(stdscr, "Hostname (e.g. fallback.nors.win or zero.stomp.zone):", "") | |
| if hostname is None: | |
| set_status("Add canceled.") | |
| continue | |
| service = prompt(stdscr, "Service (e.g. http://localhost:3000):", "http://localhost:3000") | |
| if service is None: | |
| set_status("Add canceled.") | |
| continue | |
| cfg_local.ingress.insert(max(0, len(cfg_local.ingress) - 1), IngressRule(hostname=hostname, service=service)) | |
| cfg_local.normalize() | |
| state["selected"] = min(state["selected"], len(cfg_local.ingress) - 1) | |
| mark_dirty() | |
| set_status(f"Added rule for {hostname}.") | |
| elif ch == ord("e"): | |
| idx = state["selected"] | |
| if idx < 0 or idx >= len(cfg_local.ingress): | |
| continue | |
| rule = cfg_local.ingress[idx] | |
| if rule.is_catch_all(): | |
| set_status("Catch-all rule is managed automatically. Edit other rules instead.") | |
| continue | |
| hostname = prompt(stdscr, "Edit hostname:", rule.hostname or "") | |
| if hostname is None: | |
| set_status("Edit canceled.") | |
| continue | |
| service = prompt(stdscr, "Edit service:", rule.service) | |
| if service is None: | |
| set_status("Edit canceled.") | |
| continue | |
| cfg_local.ingress[idx] = IngressRule(hostname=hostname, service=service, origin_request=rule.origin_request) | |
| cfg_local.normalize() | |
| mark_dirty() | |
| set_status(f"Updated rule #{idx+1}.") | |
| elif ch == ord("d"): | |
| idx = state["selected"] | |
| if idx < 0 or idx >= len(cfg_local.ingress): | |
| continue | |
| rule = cfg_local.ingress[idx] | |
| if rule.is_catch_all(): | |
| set_status("Can't delete catch-all rule.") | |
| continue | |
| ans = prompt(stdscr, f"Type 'del' to delete {rule.hostname}:", "") | |
| if ans != "del": | |
| set_status("Delete canceled.") | |
| continue | |
| cfg_local.ingress.pop(idx) | |
| cfg_local.normalize() | |
| state["selected"] = max(0, min(state["selected"], len(cfg_local.ingress) - 1)) | |
| mark_dirty() | |
| set_status("Rule deleted.") | |
| elif ch == ord("t"): | |
| tunnel = prompt(stdscr, "Set tunnel name/id:", cfg_local.tunnel) | |
| if tunnel is None: | |
| set_status("Tunnel edit canceled.") | |
| continue | |
| creds = prompt(stdscr, "Set credentials-file path:", cfg_local.credentials_file) | |
| if creds is None: | |
| set_status("Creds edit canceled.") | |
| continue | |
| cfg_local.tunnel = tunnel | |
| cfg_local.credentials_file = creds | |
| mark_dirty() | |
| set_status("Updated tunnel + credentials-file.") | |
| elif ch == ord("v"): | |
| errs = cfg_local.validate() | |
| if errs: | |
| set_status("Validation failed: " + " | ".join(errs[:3]) + (" ..." if len(errs) > 3 else "")) | |
| else: | |
| set_status("Validation OK.") | |
| elif ch == ord("s"): | |
| errs = cfg_local.validate() | |
| if errs: | |
| set_status("Fix validation errors before saving.") | |
| continue | |
| if args.dry_run: | |
| set_status("Dry run: not writing config.") | |
| continue | |
| try: | |
| write_config(cfg_path, cfg_local, backup=args.backup) | |
| state["dirty"] = False | |
| set_status("Saved config.") | |
| except Exception as e: | |
| set_status(f"Save failed: {e}") | |
| elif ch == ord("r"): | |
| if args.no_restart: | |
| set_status("Restart disabled via --no-restart.") | |
| continue | |
| if state["dirty"]: | |
| set_status("Save first before restarting.") | |
| continue | |
| ok, msg = restart_service(args.service) | |
| if ok: | |
| set_status(f"Restarted {args.service}.") | |
| else: | |
| set_status(f"Restart failed: {msg[:120]}") | |
| import curses | |
| return curses.wrapper(main) | |
| def main() -> int: | |
| parser = argparse.ArgumentParser(description="Curses TUI for cloudflared config.yml") | |
| parser.add_argument("--config", default="/etc/cloudflared/config.yml", help="Path to config.yml") | |
| parser.add_argument("--backup", action="store_true", help="Create a timestamped backup before saving") | |
| parser.add_argument("--dry-run", action="store_true", help="Do not write changes to disk") | |
| parser.add_argument("--service", default="cloudflared", help="systemd service name to restart") | |
| parser.add_argument("--no-restart", action="store_true", help="Disable restart option") | |
| args = parser.parse_args() | |
| cfg_path = Path(args.config) | |
| if os.geteuid() != 0 and str(cfg_path).startswith("/etc/"): | |
| print("Run this with sudo if you want to edit /etc/cloudflared/config.yml.", file=sys.stderr) | |
| cfg = load_config(cfg_path) | |
| return run_tui(cfg_path, cfg, args) | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment