Skip to content

Instantly share code, notes, and snippets.

@haseeb-heaven
Last active May 7, 2026 23:11
Show Gist options
  • Select an option

  • Save haseeb-heaven/0ee566316ec0c94128529477e0a3f071 to your computer and use it in GitHub Desktop.

Select an option

Save haseeb-heaven/0ee566316ec0c94128529477e0a3f071 to your computer and use it in GitHub Desktop.
A reliable github migration engine with full migration PR,Issues,Comments,Description and more.
#!/usr/bin/env python3
"""
github_migration_engine.py -- Standalone GitHub Migration Tool
================================================================
Zero external dependencies beyond: requests, python-dotenv
Install: pip install requests python-dotenv
.env file (create alongside this script):
GITHUB_TOKEN=ghp_xxxxxxxxxxxxxxxxxxxx
SRC_REPO=owner/source-repo
DEST_REPO=owner/destination-repo
SRC_BRANCH=main
DEST_BRANCH=main
What gets migrated (in order):
1. Git -- all code, branches, tags (force-push)
2. Labels
3. Milestones
4. Issues (with original author/date attribution header)
5. Issue comments
6. Pull Requests (fallback to placeholder issue if branch missing)
7. PR review comments (preserved as issue comments with file/line info)
8. Releases
9. [Optional] Rename swap (src -> src-old, dest -> src name)
10. [Optional] Archive old repo
11. Verify
"""
import os
import sys
import time
import random
import subprocess
import logging
from pathlib import Path
from typing import Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
import requests
from dotenv import load_dotenv
from tqdm import tqdm
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger("migration")
load_dotenv()
TOKEN = os.environ.get("GITHUB_TOKEN")
SRC = os.getenv("SRC_REPO")
DEST = os.getenv("DEST_REPO")
SRC_BRANCH = os.getenv("SRC_BRANCH")
DEST_BRANCH = os.getenv("DEST_BRANCH")
MAX_COMMENT_WORKERS = int(os.getenv("MAX_COMMENT_WORKERS", "4"))
MAX_PR_WORKERS = int(os.getenv("MAX_PR_WORKERS", "4"))
PRINT_LOCK = Lock()
def _slug(value: str) -> str:
value = value.strip().rstrip("/")
if value.startswith("https://github.com/"):
value = value[len("https://github.com/"):]
return value
SRC = _slug(SRC)
DEST = _slug(DEST)
WORK_DIR = Path("./" + DEST.split("/")[-1] + "_migration") if DEST else Path("./migration_workdir")
DEST_URL = "https://x-access-token:" + TOKEN + "@github.com/" + DEST + ".git" if TOKEN and DEST else ""
HEADERS = {
"Authorization": "Bearer " + TOKEN,
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28",
}
# ---------------------------------------------------------------------------
# CLI helpers
# ---------------------------------------------------------------------------
def confirm(msg: str) -> bool:
return input("\n\u26a0\ufe0f " + msg + " [Y/N]: ").strip().lower() in ("y", "yes")
def run(cmd: str, cwd=None, check: bool = True) -> str:
print("$ " + cmd)
result = subprocess.run(cmd, shell=True, cwd=cwd, capture_output=True, text=True)
if check and result.returncode != 0:
raise RuntimeError(result.stderr.strip() or result.stdout.strip())
return result.stdout.strip()
def ts_print(*args, **kwargs):
with PRINT_LOCK:
print(*args, **kwargs)
# ---------------------------------------------------------------------------
# GitHub API helpers
# ---------------------------------------------------------------------------
def safe_json(resp: requests.Response):
if not resp.text or not resp.text.strip():
return {}
try:
return resp.json()
except ValueError:
return {}
def is_rate_limited(resp: requests.Response) -> bool:
remaining = resp.headers.get("x-ratelimit-remaining")
body = safe_json(resp)
message = str(body.get("message", "")).lower() if isinstance(body, dict) else (resp.text or "").lower()
return resp.status_code in (403, 429) and (
remaining == "0" or "rate limit" in message or "secondary rate limit" in message
)
def _retry_wait(attempt: int, resp: Optional[requests.Response] = None) -> float:
if resp is not None and is_rate_limited(resp):
reset = resp.headers.get("x-ratelimit-reset")
if reset:
try:
return max(1, int(reset) - int(time.time())) + random.uniform(0, 1)
except ValueError:
pass
return (2 ** attempt) + random.uniform(0, 1)
def gh(method: str, path: str, payload=None, retries: int = 6):
"""Make a GitHub REST API call with automatic retry and rate-limit handling."""
url = "https://api.github.com" + path
last_err = None
for i in range(retries):
try:
r = requests.request(method, url, headers=HEADERS, json=payload, timeout=(10, 30))
except requests.Timeout:
wait = _retry_wait(i)
log.warning("Timeout -- retrying in %.1fs", wait)
time.sleep(wait)
continue
except requests.RequestException as exc:
last_err = str(exc)
wait = _retry_wait(i)
log.warning("Network error -- retrying in %.1fs", wait)
time.sleep(wait)
continue
if r.status_code < 400:
return safe_json(r)
last_err = str(r.status_code) + ": " + r.text
if is_rate_limited(r):
wait = _retry_wait(i, r)
log.warning("Rate limited -- waiting %.1fs", wait)
time.sleep(wait)
continue
if r.status_code in (500, 502, 503):
wait = _retry_wait(i, r)
log.warning("Server error %s -- retrying in %.1fs", r.status_code, wait)
time.sleep(wait)
continue
raise RuntimeError(last_err)
raise RuntimeError(last_err or "Max retries exceeded")
def paginate(path: str) -> list:
"""Fetch all pages from a GitHub list endpoint."""
results, page = [], 1
while True:
sep = "&" if "?" in path else "?"
batch = gh("GET", path + sep + "per_page=100&page=" + str(page))
if not batch:
break
if not isinstance(batch, list):
raise RuntimeError("Expected list response from: " + path)
results.extend(batch)
if len(batch) < 100:
break
page += 1
return results
def _ensure_label(repo: str, name: str, color: str = "0075ca", description: str = ""):
try:
gh("POST", "/repos/" + repo + "/labels",
{"name": name, "color": color, "description": description})
except RuntimeError:
pass
def _attribution_header(original_url: str, author: str, created_at: str) -> str:
return (
"> **Migrated from:** [" + original_url + "](" + original_url + ") \n"
"> **Original author:** @" + author + " \n"
"> **Created at:** " + created_at + "\n\n"
)
# ---------------------------------------------------------------------------
# Validation
# ---------------------------------------------------------------------------
def validate():
errors = []
if not TOKEN:
errors.append("GITHUB_TOKEN is required in .env")
if not SRC or not DEST:
errors.append("SRC_REPO and DEST_REPO are required in .env")
if not SRC_BRANCH or not DEST_BRANCH:
errors.append("SRC_BRANCH and DEST_BRANCH are required in .env")
if SRC and "/" not in SRC:
errors.append("SRC_REPO must be owner/repo format or full GitHub URL")
if DEST and "/" not in DEST:
errors.append("DEST_REPO must be owner/repo format or full GitHub URL")
if errors:
for e in errors:
log.error(e)
raise RuntimeError("Fix .env errors above and retry.")
log.info("Config validated: %s -> %s", SRC, DEST)
# ---------------------------------------------------------------------------
# PRECHECK
# ---------------------------------------------------------------------------
def check_branch_protection(repo: str, branch: str):
try:
gh("GET", "/repos/" + repo + "/branches/" + branch + "/protection")
print(" ⚠️ Branch protection detected on " + repo + ":" + branch)
if not confirm(branch + " appears protected. Force-push may fail. Continue anyway?"):
raise RuntimeError("Aborted due to branch protection.")
except RuntimeError as exc:
msg = str(exc).lower()
if "404" in msg:
print(" ✅ No branch protection on " + repo + ":" + branch)
return
if "403" in msg:
print(" ⚠️ Cannot inspect branch protection for " + repo + ":" + branch + " (permissions)")
if not confirm("Branch protection status unknown. Continue anyway?"):
raise RuntimeError("Aborted: branch protection could not be verified.")
return
raise
def preflight_dest():
print("\n[PRECHECK] Destination repository")
repo = gh("GET", "/repos/" + DEST)
print(" ✅ Destination found: " + repo["full_name"])
issues = [i for i in paginate("/repos/" + DEST + "/issues?state=all") if "pull_request" not in i]
prs = paginate("/repos/" + DEST + "/pulls?state=all")
releases = paginate("/repos/" + DEST + "/releases")
labels = paginate("/repos/" + DEST + "/labels")
if labels:
print(" ℹ️ Destination already has " + str(len(labels)) + " labels.")
for kind, items in [("issues", issues), ("pull requests", prs), ("releases", releases)]:
if items:
print(" ⚠️ Destination already has " + str(len(items)) + " " + kind + ".")
if issues or prs or releases:
if not confirm("Destination has existing metadata -- reruns will duplicate. Continue?"):
raise RuntimeError("Aborted: destination not empty.")
# ---------------------------------------------------------------------------
# STEP 1: Git
# ---------------------------------------------------------------------------
def step_git():
print("\n[STEP 1] Git migration: " + SRC + " -> " + DEST)
if not confirm("Force-overwrite destination repo code / branches / tags?"):
return
check_branch_protection(DEST, DEST_BRANCH)
if not WORK_DIR.exists():
run("git clone --filter=blob:none \"" + DEST_URL + "\" \"" + str(WORK_DIR) + "\"")
else:
print(" Cleaning existing workdir...")
run("git reset --hard", cwd=WORK_DIR)
run("git clean -fd", cwd=WORK_DIR)
run("git fetch origin --prune", cwd=WORK_DIR)
src_remote = "https://github.com/" + SRC + ".git"
try:
run("git remote add source \"" + src_remote + "\"", cwd=WORK_DIR)
except RuntimeError:
run("git remote set-url source \"" + src_remote + "\"", cwd=WORK_DIR)
run("git fetch source --prune --tags", cwd=WORK_DIR)
run("git checkout -B " + DEST_BRANCH + " source/" + SRC_BRANCH, cwd=WORK_DIR)
run("git push origin " + DEST_BRANCH + " --force", cwd=WORK_DIR)
for line in run("git branch -r", cwd=WORK_DIR).splitlines():
line = line.strip()
if not line.startswith("source/") or "HEAD" in line:
continue
branch = line.removeprefix("source/")
if branch == SRC_BRANCH:
continue
run("git push origin refs/remotes/source/" + branch + ":refs/heads/" + branch + " --force", cwd=WORK_DIR)
run("git push origin --tags --force", cwd=WORK_DIR)
gh("PATCH", "/repos/" + DEST, {"default_branch": DEST_BRANCH})
print(" ✅ Git migration complete")
# ---------------------------------------------------------------------------
# STEP 1.5: Collaborators
# ---------------------------------------------------------------------------
def step_collaborators():
print("\n[STEP 1.5] Collaborators")
src_collabs = paginate("/repos/" + SRC + "/collaborators")
dest_owner = DEST.split("/")[0].lower()
for collab in src_collabs:
user = collab["login"]
# Skip the destination repository owner to prevent API validation errors
if user.lower() == dest_owner:
continue
print(" Inviting " + user + " to " + DEST + "...")
try:
# Grants 'push' (write) access by default. Change to 'maintain' or 'admin' if needed.
gh("PUT", "/repos/" + DEST + "/collaborators/" + user, {"permission": "push"})
except Exception as e:
log.warning("Could not invite " + user + ": " + str(e))
print(" ✅ Collaborator invitations processed")
def step_about():
print("\n[STEP 1.6] About / Homepage / Topics")
src_repo = gh("GET", "/repos/" + SRC)
description = src_repo.get("description") or ""
homepage = src_repo.get("homepage") or ""
src_topics_resp = gh("GET", "/repos/" + SRC + "/topics")
topics = src_topics_resp.get("names", []) if isinstance(src_topics_resp, dict) else []
try:
gh("PATCH", "/repos/" + DEST, {
"description": description,
"homepage": homepage,
})
print(" ✅ Description/homepage updated")
except RuntimeError as exc:
log.warning("Could not update description/homepage: %s", exc)
try:
gh("PUT", "/repos/" + DEST + "/topics", {
"names": topics
})
print(" ✅ Topics updated (" + str(len(topics)) + ")")
except RuntimeError as exc:
log.warning("Could not update topics: %s", exc)
def step_contributors():
print("\n[STEP 1.7] Migrate bot contributors from SRC_REPO")
if not WORK_DIR.exists():
log.error("Workdir does not exist; run step_git first.")
return
src_bots = SRC # already loaded from .env: SRC_REPO=haseeb-heaven/gworkspace-agent
src_bots_owner, src_bots_repo = src_bots.split("/", 1)
src_bots_url = "https://github.com/" + src_bots + ".git"
src_bots_branch = SRC_BRANCH # develop (from .env)
try:
# Add bots repo as remote inside the DEST repo workdir
run("git remote add bots " + src_bots_url, cwd=WORK_DIR)
# Fetch all of that repo
run("git fetch bots --prune", cwd=WORK_DIR)
# Create a local branch pointing to the bots branch
bots_local_branch = "bots-" + src_bots_branch.replace("/", "-")
run("git checkout -B " + bots_local_branch, cwd=WORK_DIR)
run("git merge --allow-unrelated-histories bots/" + src_bots_branch, cwd=WORK_DIR)
# Force‑push that branch to origin so GitHub sees the commits
run("git push origin " + bots_local_branch + " --force", cwd=WORK_DIR)
# Optional: merge bot commits into your main/develop if you want them on default branch
# run("git checkout " + DEST_BRANCH, cwd=WORK_DIR)
# run("git merge --allow-unrelated-histories " + bots_local_branch, cwd=WORK_DIR)
# run("git push origin " + DEST_BRANCH + " --force", cwd=WORK_DIR)
print(" ✅ Bot contributors (from SRC_REPO branch) are now in repo; GitHub will show them as contributors.")
except RuntimeError as exc:
log.warning("Could not migrate bot contributors via git: %s", exc)
# ---------------------------------------------------------------------------
# STEP 2: Labels
# ---------------------------------------------------------------------------
def step_labels():
print("\n[STEP 2] Labels")
src_labels = paginate("/repos/" + SRC + "/labels")
dest_names = {lbl["name"] for lbl in paginate("/repos/" + DEST + "/labels")}
created = 0
for lbl in src_labels:
if lbl["name"] not in dest_names:
_ensure_label(DEST, lbl["name"], lbl.get("color", "0075ca"), lbl.get("description") or "")
created += 1
print(" ✅ Labels migrated (" + str(created) + " new, " + str(len(dest_names)) + " already existed)")
# ---------------------------------------------------------------------------
# STEP 3: Milestones
# ---------------------------------------------------------------------------
def step_milestones() -> dict:
print("\n[STEP 3] Milestones")
src_ms = paginate("/repos/" + SRC + "/milestones?state=all")
dest_ms = {m["title"]: m["number"] for m in paginate("/repos/" + DEST + "/milestones?state=all")}
mapping: dict = {}
for ms in src_ms:
if ms["title"] in dest_ms:
mapping[ms["number"]] = dest_ms[ms["title"]]
continue
payload: dict = {
"title": ms["title"],
"description": ms.get("description") or "",
"state": ms.get("state", "open"),
}
if ms.get("due_on"):
payload["due_on"] = ms["due_on"]
created = gh("POST", "/repos/" + DEST + "/milestones", payload)
mapping[ms["number"]] = created["number"]
print(" ✅ Milestones migrated (" + str(len(mapping)) + " total)")
return mapping
# ---------------------------------------------------------------------------
# STEP 4: Issues
# ---------------------------------------------------------------------------
def step_issues(milestone_map: dict) -> dict:
print("\n[STEP 4] Issues")
src_issues = [
i for i in paginate("/repos/" + SRC + "/issues?state=all&sort=created&direction=asc")
if "pull_request" not in i
]
print(" Found " + str(len(src_issues)) + " issues in source")
issue_map: dict = {}
for issue in src_issues:
header = _attribution_header(issue["html_url"], issue["user"]["login"], issue["created_at"])
body = header + (issue.get("body") or "*(no description)*")
payload: dict = {
"title": issue["title"],
"body": body,
"labels": [lbl["name"] for lbl in issue.get("labels", [])],
}
if issue.get("milestone") and issue["milestone"]["number"] in milestone_map:
payload["milestone"] = milestone_map[issue["milestone"]["number"]]
if issue.get("assignees"):
payload["assignees"] = [a["login"] for a in issue["assignees"]]
new = gh("POST", "/repos/" + DEST + "/issues", payload)
issue_map[issue["number"]] = new["number"]
if issue["state"] == "closed":
gh("PATCH", "/repos/" + DEST + "/issues/" + str(new["number"]), {"state": "closed"})
print(" #" + str(issue["number"]).rjust(4) + " -> #" + str(new["number"]).ljust(4) + " " + issue["title"][:60])
time.sleep(0.3)
print(" ✅ Issues migrated (" + str(len(issue_map)) + ")")
return issue_map
# ---------------------------------------------------------------------------
# STEP 5: Issue comments
# ---------------------------------------------------------------------------
def step_issue_comments(issue_map: dict):
print("\n[STEP 5] Issue comments")
total = 0
total_lock = Lock()
tasks = []
for src_num, dest_num in issue_map.items():
for c in paginate("/repos/" + SRC + "/issues/" + str(src_num) + "/comments"):
tasks.append((src_num, dest_num, c))
def process_comment(task):
nonlocal total
src_num, dest_num, c = task
header = _attribution_header(c["html_url"], c["user"]["login"], c["created_at"])
body = header + (c.get("body") or "*(empty comment)*")
gh("POST", "/repos/" + DEST + "/issues/" + str(dest_num) + "/comments", {"body": body})
with total_lock:
total += 1
with ThreadPoolExecutor(max_workers=MAX_COMMENT_WORKERS) as executor:
futures = [executor.submit(process_comment, task) for task in tasks]
for future in as_completed(futures):
future.result()
print(" ✅ Issue comments migrated (" + str(total) + ")")
# ---------------------------------------------------------------------------
# STEP 6: Pull Requests
# ---------------------------------------------------------------------------
def _find_existing_pr(repo: str, head: str, base: str):
"""Return an existing open PR for the given head->base, or None."""
owner = repo.split("/")[0]
results = paginate("/repos/" + repo + "/pulls?state=open&head=" + owner + ":" + head + "&base=" + base)
return results[0] if results else None
def step_pull_requests(issue_map: dict, milestone_map: dict) -> dict:
print("\n[STEP 6] Pull Requests")
src_prs = paginate("/repos/" + SRC + "/pulls?state=all&sort=created&direction=asc")
print(" Found " + str(len(src_prs)) + " PRs in source")
dest_branches = {b["name"] for b in paginate("/repos/" + DEST + "/branches")}
pr_map: dict = {}
pr_map_lock = Lock()
created_prs = 0
placeholder_issues = 0
skipped = 0
counters_lock = Lock()
def create_issue_placeholder(pr, head_branch, base_branch, body, reason: str):
payload = {
"title": "[PR] " + pr["title"],
"body": body + (
"\n\n---\n"
"> ⚠️ **PR migration fallback:** This pull request could not be recreated as a real PR.\n"
"> **Reason:** " + reason + "\n"
"> **Original head:** `" + head_branch + "`\n"
"> **Original base:** `" + base_branch + "`"
),
"labels": [lbl["name"] for lbl in pr.get("labels", [])],
}
new_issue = gh("POST", "/repos/" + DEST + "/issues", payload)
if pr["state"] == "closed":
gh("PATCH", "/repos/" + DEST + "/issues/" + str(new_issue["number"]), {"state": "closed"})
return new_issue
def process_pr(pr):
nonlocal created_prs, placeholder_issues, skipped
head_branch = pr["head"]["ref"]
base_branch = pr["base"]["ref"]
header = _attribution_header(pr["html_url"], pr["user"]["login"], pr["created_at"])
body = header + (pr.get("body") or "*(no description)*")
missing = [b for b in (head_branch, base_branch) if b not in dest_branches]
if missing:
try:
new_issue = create_issue_placeholder(
pr, head_branch, base_branch, body,
"Missing destination branch(es): " + ", ".join(missing)
)
with pr_map_lock:
pr_map[pr["number"]] = new_issue["number"]
with counters_lock:
placeholder_issues += 1
ts_print(
" PR #" + str(pr["number"]).rjust(4) +
" -> issue #" + str(new_issue["number"]).ljust(4) +
" [placeholder] " + pr["title"][:48]
)
except RuntimeError as exc:
log.warning("PR #%s failed placeholder fallback: %s", pr["number"], exc)
with counters_lock:
skipped += 1
ts_print(
" PR #" + str(pr["number"]).rjust(4) +
" -> SKIPPED [missing branches + placeholder failed] " +
pr["title"][:40]
)
return
existing = _find_existing_pr(DEST, head_branch, base_branch)
if existing:
gh(
"PATCH", "/repos/" + DEST + "/pulls/" + str(existing["number"]),
{"title": pr["title"], "body": body}
)
if pr["state"] == "closed":
gh("PATCH", "/repos/" + DEST + "/pulls/" + str(existing["number"]), {"state": "closed"})
with pr_map_lock:
pr_map[pr["number"]] = existing["number"]
with counters_lock:
created_prs += 1
ts_print(
" PR #" + str(pr["number"]).rjust(4) +
" -> #" + str(existing["number"]).ljust(4) +
" [reused existing] " + pr["title"][:46]
)
return
payload: dict = {
"title": pr["title"],
"body": body,
"head": head_branch,
"base": base_branch,
}
if pr.get("milestone") and pr["milestone"]["number"] in milestone_map:
payload["milestone"] = milestone_map[pr["milestone"]["number"]]
try:
new_pr = gh("POST", "/repos/" + DEST + "/pulls", payload)
if pr["state"] == "closed":
gh("PATCH", "/repos/" + DEST + "/pulls/" + str(new_pr["number"]), {"state": "closed"})
with pr_map_lock:
pr_map[pr["number"]] = new_pr["number"]
with counters_lock:
created_prs += 1
ts_print(
" PR #" + str(pr["number"]).rjust(4) +
" -> #" + str(new_pr["number"]).ljust(4) +
" " + pr["title"][:55]
)
return
except RuntimeError as exc:
err = str(exc)
if "already exists" in err.lower():
existing = _find_existing_pr(DEST, head_branch, base_branch)
if existing:
with pr_map_lock:
pr_map[pr["number"]] = existing["number"]
with counters_lock:
created_prs += 1
ts_print(
" PR #" + str(pr["number"]).rjust(4) +
" -> #" + str(existing["number"]).ljust(4) +
" [reused on retry] " + pr["title"][:45]
)
return
try:
new_issue = create_issue_placeholder(
pr, head_branch, base_branch, body,
"Pull request creation failed: " + err
)
with pr_map_lock:
pr_map[pr["number"]] = new_issue["number"]
with counters_lock:
placeholder_issues += 1
ts_print(
" PR #" + str(pr["number"]).rjust(4) +
" -> issue #" + str(new_issue["number"]).ljust(4) +
" [placeholder] " + pr["title"][:48]
)
except RuntimeError as fallback_exc:
log.warning("PR #%s failed (%s) and placeholder failed (%s)", pr["number"], exc, fallback_exc)
with counters_lock:
skipped += 1
ts_print(
" PR #" + str(pr["number"]).rjust(4) +
" -> SKIPPED [create+fallback failed] " +
pr["title"][:40]
)
with ThreadPoolExecutor(max_workers=MAX_PR_WORKERS) as executor:
futures = [executor.submit(process_pr, pr) for pr in src_prs]
for future in as_completed(futures):
future.result()
print(
" ✅ Pull requests migrated (" +
str(created_prs) + " PRs, " +
str(placeholder_issues) + " placeholder issues, " +
str(skipped) + " skipped)"
)
return pr_map
# ---------------------------------------------------------------------------
# STEP 7: PR review comments
# ---------------------------------------------------------------------------
def step_pr_review_comments(pr_map: dict):
print("\n[STEP 7] PR review comments")
total = 0
total_lock = Lock()
tasks = []
for src_pr_num, dest_pr_num in pr_map.items():
for c in paginate("/repos/" + SRC + "/pulls/" + str(src_pr_num) + "/comments"):
tasks.append((src_pr_num, dest_pr_num, c))
def process_review_comment(task):
nonlocal total
src_pr_num, dest_pr_num, c = task
header = _attribution_header(c["html_url"], c["user"]["login"], c["created_at"])
path_val = c.get("path") or "unknown"
line_val = str(c.get("line") or c.get("original_line") or "?")
file_note = "> **File:** `" + path_val + "` line " + line_val + "\n\n"
body = header + file_note + (c.get("body") or "*(empty comment)*")
try:
gh("POST", "/repos/" + DEST + "/issues/" + str(dest_pr_num) + "/comments", {"body": body})
with total_lock:
total += 1
except RuntimeError as exc:
log.warning("Could not migrate review comment on PR #%s: %s", src_pr_num, exc)
with ThreadPoolExecutor(max_workers=MAX_COMMENT_WORKERS) as executor:
futures = [executor.submit(process_review_comment, task) for task in tasks]
for future in as_completed(futures):
future.result()
print(" ✅ PR review comments migrated (" + str(total) + ")")
# ---------------------------------------------------------------------------
# STEP 8: Releases
# ---------------------------------------------------------------------------
def step_releases():
print("\n[STEP 8] Releases")
src_releases = paginate("/repos/" + SRC + "/releases")
print(" Found " + str(len(src_releases)) + " releases in source")
dest_tags = {t["name"] for t in paginate("/repos/" + DEST + "/tags")}
created = 0
for rel in reversed(src_releases):
if rel["tag_name"] not in dest_tags:
log.warning("Tag %s not in dest -- skipping release %s", rel["tag_name"], rel.get("name"))
continue
header = (
"> **Migrated from:** [" + rel["html_url"] + "](" + rel["html_url"] + ") \n"
"> **Original author:** @" + rel["author"]["login"] + " \n"
"> **Published at:** " + rel["published_at"] + "\n\n"
)
body = header + (rel.get("body") or "*(no release notes)*")
payload = {
"tag_name": rel["tag_name"],
"name": rel.get("name") or rel["tag_name"],
"body": body,
"draft": rel.get("draft", False),
"prerelease": rel.get("prerelease", False),
"make_latest": "legacy",
}
try:
gh("POST", "/repos/" + DEST + "/releases", payload)
created += 1
print(" " + rel["tag_name"] + " " + (rel.get("name") or "")[:55])
except RuntimeError as exc:
log.warning("Release %s failed: %s", rel["tag_name"], exc)
time.sleep(0.3)
print(" ✅ Releases migrated (" + str(created) + ")")
# ---------------------------------------------------------------------------
# STEP 9: Rename swap (optional)
# ---------------------------------------------------------------------------
def _rename_repo(full_repo: str, new_name: str):
"""Rename a GitHub repo, raising on final failure."""
for attempt in range(4):
try:
gh("PATCH", "/repos/" + full_repo, {"name": new_name})
log.info("Renamed %s -> %s", full_repo, new_name)
return
except RuntimeError as exc:
if "422" in str(exc) and "name already exists" in str(exc):
raise RuntimeError(
"Name collision: '" + new_name + "' already exists on this account. "
"This should not happen in the 3-step swap -- check your repo list."
)
if attempt == 3:
raise
wait = (2 ** attempt) + random.uniform(0, 1)
log.warning("Rename failed (%s) -- retrying in %.1fs", exc, wait)
time.sleep(wait)
def step_rename():
"""
Atomic 3-step swap so GitHub never sees a name collision:
STEP 1: owner/src-name -> owner/src-name-tmp-{ts} (frees the src name)
STEP 2: owner/dest-name -> owner/src-name (dest claims the src name)
STEP 3: owner/tmp -> owner/src-name-old (tmp becomes the archive)
"""
print("\n[STEP 9] Rename (swap repo names)")
owner = SRC.split("/")[0]
src_name = SRC.split("/")[1]
dest_name = DEST.split("/")[1]
tmp_name = src_name + "-tmp-" + str(int(time.time()))
old_name = src_name + "-old"
print(" Swap plan:")
print(" Step 1: " + owner + "/" + src_name + " -> " + owner + "/" + tmp_name)
print(" Step 2: " + owner + "/" + dest_name + " -> " + owner + "/" + src_name)
print(" Step 3: " + owner + "/" + tmp_name + " -> " + owner + "/" + old_name)
if not confirm("Proceed with rename swap?"):
print(" Skipped rename.")
return
# Step 1 — free the original name
log.info("Step 1/3: %s -> %s", SRC, tmp_name)
_rename_repo(SRC, tmp_name)
time.sleep(2)
# Step 2 — dest claims original name (with rollback if it fails)
log.info("Step 2/3: %s -> %s", DEST, src_name)
try:
_rename_repo(DEST, src_name)
except Exception as exc:
log.error("Step 2 failed -- rolling back Step 1")
try:
_rename_repo(owner + "/" + tmp_name, src_name)
log.info("Rollback OK: %s restored", src_name)
except Exception as rb_exc:
log.error("ROLLBACK FAILED: %s -- manual fix needed", rb_exc)
raise RuntimeError("Rename aborted at Step 2: " + str(exc))
time.sleep(2)
# Step 3 — temp becomes *-old (non-critical, won't abort if it fails)
log.info("Step 3/3: %s -> %s", tmp_name, old_name)
try:
_rename_repo(owner + "/" + tmp_name, old_name)
except Exception as exc:
log.warning("Step 3 non-critical fail -- rename %s/%s to %s manually. Error: %s",
owner, tmp_name, old_name, exc)
print(" ✅ Rename complete")
print(" Canonical : " + owner + "/" + src_name + " (migrated)")
print(" Archive : " + owner + "/" + old_name + " (original)")
# ---------------------------------------------------------------------------
# STEP 10: Archive (optional)
# ---------------------------------------------------------------------------
def step_archive():
print("\n[STEP 10] Archive old repo")
owner = SRC.split("/")[0]
old_repo = owner + "/" + SRC.split("/")[1] + "-old"
if not confirm("Archive " + old_repo + "?"):
return
gh("PATCH", "/repos/" + old_repo, {"archived": True})
print(" ✅ Archived")
# ---------------------------------------------------------------------------
# STEP 11: Verify
# ---------------------------------------------------------------------------
def step_verify():
print("\n[STEP 11] Verify")
owner = DEST.split("/")[0]
src_name = SRC.split("/")[1]
for repo_slug, label in [
(owner + "/" + src_name, "new repo"),
(owner + "/" + src_name + "-old", "old repo"),
]:
try:
info = gh("GET", "/repos/" + repo_slug)
archived = " (archived)" if info.get("archived") else ""
print(" ✅ " + label + ": " + info["full_name"] + archived)
except Exception as exc:
print(" ❌ " + label + " (" + repo_slug + "): " + str(exc))
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main():
validate()
print("\n🚀 MIGRATION START\n")
steps = [
(step_git, "Git"),
(step_collaborators, "Collaborators"),
(step_about, "About"),
(step_contributors, "Contributors (git)"),
(step_labels, "Labels"),
(lambda: step_milestones(), "Milestones"), # returns milestone_map
(lambda: step_issues(milestone_map), "Issues"), # depends on previous step
(lambda: step_issue_comments(issue_map), "Issue comments"),
(lambda: step_pull_requests(issue_map, milestone_map), "Pull Requests"),
(lambda: step_pr_review_comments(pr_map), "PR review comments"),
(step_releases, "Releases"),
(step_rename, "Rename"),
(step_archive, "Archive"),
(step_verify, "Verify"),
]
# Dummy milestone_map / issue_map / pr_map for type‑hint style, overridden inside
milestone_map: dict = {}
issue_map: dict = {}
pr_map: dict = {}
for step_func, label in tqdm(steps, desc="Migration", ncols=100, colour="green"):
if callable(step_func):
if step_func.__name__ == "step_milestones":
milestone_map = step_func()
elif step_func.__name__ == "step_issues":
issue_map = step_func()
elif step_func.__name__ == "step_pull_requests":
pr_map = step_func()
else:
step_func()
else:
# For lambdas, capture side‑effected vars
if "milestone_map" in step_func.__code__.co_freevars:
milestone_map = step_func()
elif "issue_map" in step_func.__code__.co_freevars:
issue_map = step_func()
elif "pr_map" in step_func.__code__.co_freevars:
pr_map = step_func()
else:
step_func()
print("\n\n✅ MIGRATION DONE\n")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n\n⚠️ Interrupted by user.\n")
sys.exit(1)
except Exception as exc:
log.error("FAILED: %s", exc)
sys.exit(1)
@haseeb-heaven
Copy link
Copy Markdown
Author

.env.example

GITHUB_TOKEN=ghp_....
SRC_REPO=haseeb-heaven/bookstore-fastapi
DEST_REPO=haseeb-heaven/bookstore-nodejs
SRC_BRANCH=main
DEST_BRANCH=main

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment