|
#!/usr/bin/env python3 |
|
|
|
import argparse |
|
import base64 |
|
import json |
|
import secrets |
|
import string |
|
import subprocess |
|
import sys |
|
from typing import List |
|
|
|
|
|
# ----------------------------- |
|
# Utilities |
|
# ----------------------------- |
|
|
|
def run(cmd: List[str], capture_json=False, allow_fail=False): |
|
result = subprocess.run( |
|
cmd, |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
text=True, |
|
) |
|
|
|
if result.returncode != 0 and not allow_fail: |
|
print(f"\n❌ Command failed: {' '.join(cmd)}") |
|
print(result.stderr.strip()) |
|
sys.exit(1) |
|
|
|
if capture_json: |
|
return json.loads(result.stdout) |
|
|
|
return result.stdout.strip() |
|
|
|
|
|
def confirm(prompt: str) -> bool: |
|
while True: |
|
ans = input(f"{prompt} [y/N]: ").strip().lower() |
|
if ans in ("y", "yes"): |
|
return True |
|
if ans in ("", "n", "no"): |
|
return False |
|
|
|
|
|
# ----------------------------- |
|
# Sensitive value regeneration |
|
# ----------------------------- |
|
|
|
BASE64_CHARS = set(string.ascii_letters + string.digits + "+/=") |
|
|
|
|
|
def looks_base64(value: str) -> bool: |
|
return len(value) % 4 == 0 and set(value).issubset(BASE64_CHARS) |
|
|
|
|
|
def regenerate_value(original: str) -> str: |
|
length = len(original) |
|
|
|
if looks_base64(original): |
|
raw_len = int(length * 3 / 4) |
|
return base64.b64encode(secrets.token_bytes(raw_len)).decode()[:length] |
|
|
|
alphabet = string.ascii_letters + string.digits |
|
return "".join(secrets.choice(alphabet) for _ in range(length)) |
|
|
|
|
|
# ----------------------------- |
|
# Core operations |
|
# ----------------------------- |
|
|
|
def get_app_info(app: str) -> dict: |
|
return run( |
|
["heroku", "apps:info", app, "--json"], |
|
capture_json=True, |
|
) |
|
|
|
def app_exists(app: str) -> bool: |
|
result = subprocess.run( |
|
["heroku", "apps:info", app], |
|
stdout=subprocess.DEVNULL, |
|
stderr=subprocess.DEVNULL, |
|
) |
|
return result.returncode == 0 |
|
|
|
def get_existing_app_info_if_any(app: str): |
|
if app_exists(app): |
|
print(f"ℹ App {app} already exists; resuming clone") |
|
return get_app_info(app) |
|
return None |
|
|
|
def get_pipeline_and_stage_from_app_info(info: dict): |
|
coupling = info.get("pipeline_coupling") |
|
if not coupling or not coupling.get("pipeline") or not coupling.get("stage"): |
|
print("❌ Could not read pipeline_coupling from app info.") |
|
sys.exit(1) |
|
|
|
pipeline_id = coupling["pipeline"]["id"] |
|
pipeline_name = coupling["pipeline"].get("name") |
|
stage = coupling["stage"] |
|
|
|
return pipeline_id, stage, pipeline_name |
|
|
|
def ensure_app_pipeline_and_stage(app: str, expected_pipeline_id: str, expected_stage: str): |
|
info = get_app_info(app) |
|
coupling = info.get("pipeline_coupling") |
|
|
|
if not coupling: |
|
print(f"❌ App {app} exists but is not attached to any pipeline.") |
|
sys.exit(1) |
|
|
|
actual_pipeline = coupling["pipeline"]["id"] |
|
actual_stage = coupling["stage"] |
|
|
|
if actual_pipeline != expected_pipeline_id or actual_stage != expected_stage: |
|
print( |
|
f"❌ App name conflict detected.\n" |
|
f" App '{app}' already exists in a different pipeline or stage.\n" |
|
f" Expected: pipeline={expected_pipeline_id}, stage={expected_stage}\n" |
|
f" Actual: pipeline={actual_pipeline}, stage={actual_stage}" |
|
) |
|
sys.exit(1) |
|
|
|
print(f"ℹ Existing app {app} is already in correct pipeline and stage; continuing") |
|
|
|
|
|
def get_addons_from_app_info(info: dict): |
|
return info.get("addons", []) |
|
|
|
def get_team_from_app_info(info: dict): |
|
# Try top-level first |
|
team = info.get("team") or info.get("organization") |
|
if team and team.get("name"): |
|
return team["name"] |
|
|
|
# Fallback: nested under "app" |
|
app = info.get("app", {}) |
|
team = app.get("team") or app.get("organization") |
|
if team and team.get("name"): |
|
return team["name"] |
|
|
|
print("❌ Could not determine team/organization from app info.") |
|
print("🔎 Available keys:", sorted(info.keys())) |
|
sys.exit(1) |
|
|
|
|
|
def create_app(app: str, team: str): |
|
if app_exists(app): |
|
print(f"ℹ App {app} already exists; skipping creation") |
|
return |
|
print(f"▶ Creating app {app} in team {team}") |
|
run(["heroku", "apps:create", app, "--team", team]) |
|
|
|
def app_in_pipeline(app: str, pipeline_id: str) -> bool: |
|
info = get_app_info(app) |
|
coupling = info.get("pipeline_coupling") |
|
return bool(coupling and coupling.get("pipeline", {}).get("id") == pipeline_id) |
|
|
|
def add_to_pipeline(app: str, pipeline_id: str, stage: str): |
|
if app_in_pipeline(app, pipeline_id): |
|
print(f"ℹ App {app} already in pipeline; skipping") |
|
return |
|
print(f"▶ Adding {app} to pipeline (stage: {stage})") |
|
run([ |
|
"heroku", |
|
"pipelines:add", |
|
pipeline_id, |
|
"--app", |
|
app, |
|
"--stage", |
|
stage, |
|
]) |
|
|
|
|
|
def copy_config(source_app: str, target_app: str, regen_keys: List[str], addon_config_vars: set): |
|
print("▶ Copying config vars") |
|
config = run( |
|
["heroku", "config", "--app", source_app, "--json"], |
|
capture_json=True, |
|
) |
|
print(f"ℹ Skipping {len(addon_config_vars)} addon-managed config vars") |
|
existing_keys = get_config_keys(target_app) |
|
|
|
for key, value in config.items(): |
|
if key in addon_config_vars: |
|
continue |
|
if key in existing_keys: |
|
continue |
|
if key in regen_keys: |
|
print(f"🔐 Regenerating {key}") |
|
value = regenerate_value(value) |
|
|
|
run([ |
|
"heroku", |
|
"config:set", |
|
f"{key}={value}", |
|
"--app", |
|
target_app, |
|
]) |
|
|
|
def get_config_keys(app: str) -> set: |
|
cfg = run(["heroku", "config", "--app", app, "--json"], capture_json=True) |
|
return set(cfg.keys()) |
|
|
|
def recreate_addons(addons: list, target_app: str, auto_confirm: bool): |
|
print("▶ Recreating addons") |
|
existing = get_existing_addon_services(target_app) |
|
|
|
for addon in addons: |
|
service = addon["addon_service"]["name"] |
|
plan = addon["plan"]["name"] |
|
|
|
if service in existing: |
|
print(f"ℹ Addon {service} already exists; skipping") |
|
continue |
|
|
|
if service == "heroku-postgresql": |
|
continue |
|
|
|
if not auto_confirm: |
|
if not confirm(f"Create addon {service}:{plan}?"): |
|
print(f"⏭ Skipping {service}:{plan}") |
|
continue |
|
|
|
print(f"▶ Creating addon {service}:{plan}") |
|
result = subprocess.run( |
|
["heroku", "addons:create", f"{service}:{plan}", "--app", target_app], |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
text=True, |
|
) |
|
|
|
if result.returncode != 0: |
|
print(f"\n❌ Failed to create addon {service}:{plan}") |
|
print(result.stderr.strip()) |
|
print( |
|
"\nℹ This failure is safe to resume from.\n" |
|
" Fix the issue (billing, marketplace terms, permissions)\n" |
|
" then re-run this script to continue." |
|
) |
|
sys.exit(1) |
|
|
|
def get_existing_addon_services(app: str) -> set: |
|
info = get_app_info(app) |
|
return {a["addon_service"]["name"] for a in info.get("addons", [])} |
|
|
|
def restore_postgres(source_app: str, target_app: str, force: bool): |
|
info = get_app_info(target_app) |
|
has_postgres = any( |
|
a["addon_service"]["name"] == "heroku-postgresql" |
|
for a in info.get("addons", []) |
|
) |
|
|
|
if has_postgres and not force: |
|
print("ℹ Postgres already present; skipping restore") |
|
return |
|
|
|
if has_postgres and force: |
|
print("⚠ Forcing Postgres restore on existing database") |
|
|
|
print("▶ Restoring Postgres from latest backup") |
|
|
|
backup = run( |
|
["heroku", "pg:backups:latest", "--app", source_app], |
|
allow_fail=True, |
|
) |
|
|
|
if not backup: |
|
print("⚠ No Postgres backup found — skipping restore") |
|
return |
|
|
|
run( |
|
[ |
|
"heroku", |
|
"pg:backups:restore", |
|
backup, |
|
"DATABASE_URL", |
|
"--app", |
|
target_app, |
|
"--confirm", |
|
target_app, |
|
] |
|
) |
|
|
|
|
|
# ----------------------------- |
|
# CLI |
|
# ----------------------------- |
|
|
|
def main(): |
|
parser = argparse.ArgumentParser( |
|
description="Clone a Heroku app (config, addons, pipeline, postgres)" |
|
) |
|
|
|
parser.add_argument("source_app") |
|
parser.add_argument("new_app") |
|
|
|
parser.add_argument( |
|
"--stage", |
|
choices=["development", "staging", "production"], |
|
help="Override pipeline stage (default: inherit from source app)", |
|
) |
|
|
|
parser.add_argument( |
|
"--regen-key", |
|
action="append", |
|
default=[], |
|
help="Config key to regenerate (repeatable)", |
|
) |
|
|
|
parser.add_argument( |
|
"--auto-confirm-addons", |
|
action="store_true", |
|
help="Do not prompt before creating addons", |
|
) |
|
|
|
parser.add_argument( |
|
"--force-postgres-restore", |
|
action="store_true", |
|
help="Force Postgres restore even if a database already exists on the target app", |
|
) |
|
|
|
args = parser.parse_args() |
|
|
|
print(f"\n🚀 Cloning {args.source_app} → {args.new_app}\n") |
|
|
|
app_info = get_app_info(args.source_app) |
|
|
|
pipeline_id, source_stage, pipeline_name = ( |
|
get_pipeline_and_stage_from_app_info(app_info) |
|
) |
|
|
|
team = get_team_from_app_info(app_info) |
|
addons = get_addons_from_app_info(app_info) |
|
|
|
addon_config_vars = set() |
|
for addon in addons: |
|
for var in addon.get("config_vars", []): |
|
addon_config_vars.add(var) |
|
|
|
target_stage = args.stage or source_stage |
|
|
|
if args.stage: |
|
print(f"ℹ Overriding pipeline stage: {source_stage} → {target_stage}") |
|
|
|
if app_exists(args.new_app): |
|
ensure_app_pipeline_and_stage( |
|
args.new_app, |
|
pipeline_id, |
|
target_stage, |
|
) |
|
else: |
|
create_app(args.new_app, team) |
|
add_to_pipeline(args.new_app, pipeline_id, target_stage) |
|
|
|
copy_config( |
|
args.source_app, |
|
args.new_app, |
|
regen_keys=args.regen_key, |
|
addon_config_vars=addon_config_vars, |
|
) |
|
|
|
recreate_addons(addons, args.new_app, auto_confirm=args.auto_confirm_addons) |
|
|
|
restore_postgres( |
|
args.source_app, |
|
args.new_app, |
|
force=args.force_postgres_restore, |
|
) |
|
|
|
print("\n✅ Clone completed successfully") |
|
print("🔎 Verify app, credentials, and billing dashboard\n") |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |