Skip to content

Instantly share code, notes, and snippets.

@natemcmaster
Last active February 1, 2026 01:50
Show Gist options
  • Select an option

  • Save natemcmaster/32749f8da8dc1685ffd7e3ef11ff46d8 to your computer and use it in GitHub Desktop.

Select an option

Save natemcmaster/32749f8da8dc1685ffd7e3ef11ff46d8 to your computer and use it in GitHub Desktop.
Interactive 1Password duplicate login merger using op CLI
#!/usr/bin/env -S uvx --with tldextract --with rich --with questionary python3
# /// script
# requires-python = ">=3.10"
# dependencies = ["tldextract", "rich", "questionary"]
# ///
"""Merge duplicate logins in a 1Password vault."""
import argparse
import json
import os
import subprocess
import sys
from collections import defaultdict
import questionary
import tldextract
from questionary import Style
from rich.console import Console
from rich.table import Table
from rich import box
console = Console()
Q_STYLE = Style([
("qmark", "fg:cyan bold"),
("question", "fg:white bold"),
("pointer", "fg:cyan bold"),
("highlighted", "fg:cyan bold"),
("selected", "fg:green"),
("answer", "fg:green bold"),
])
# Domains where specific subdomains indicate a different service/account.
# Maps base domain -> set of subdomains that should be kept distinct.
# A subdomain match preserves "subdomain.domain.suffix" as the key;
# anything else collapses to "domain.suffix".
SUBDOMAIN_EXCEPTIONS = {
"amazon.com": {"aws", "seller", "sellercentral", "advertising"},
"google.com": {"cloud", "console.cloud", "ads", "analytics", "firebase"},
"apple.com": {"developer"},
"microsoft.com": {"dev", "portal.azure", "entra"},
"salesforce.com": {"*"}, # every subdomain is a different org
"atlassian.net": {"*"}, # every subdomain is a different workspace
"zendesk.com": {"*"}, # every subdomain is a different org
"myshopify.com": {"*"}, # every subdomain is a different store
"shopify.com": {"accounts", "partners"},
"freshdesk.com": {"*"}, # every subdomain is a different org
"hubspot.com": {"app", "api"},
"okta.com": {"*"}, # every subdomain is a different org
"auth0.com": {"*"}, # every subdomain is a different tenant
}
def extract_domain(url):
ext = tldextract.extract(url)
base = f"{ext.domain}.{ext.suffix}".lower() if ext.suffix else ext.domain.lower()
sub = (ext.subdomain or "").lower().removeprefix("www.").removeprefix("www")
if not sub or sub == "www":
return base
exceptions = SUBDOMAIN_EXCEPTIONS.get(base)
if exceptions is None:
return base
# Wildcard: every subdomain is distinct
if "*" in exceptions:
return f"{sub}.{base}"
# Check if subdomain (or its prefix) matches an exception
for exc in exceptions:
if sub == exc or sub.startswith(exc + "."):
return f"{sub}.{base}"
return base
_op_env = None
def ensure_signed_in():
global _op_env
if _op_env is not None:
return
_op_env = dict(os.environ)
# Check if already signed in
result = subprocess.run(["op", "whoami"], capture_output=True, text=True, env=_op_env)
if result.returncode == 0:
return
# Sign in and capture exported env vars
with console.status("[bold]Signing in to 1Password..."):
result = subprocess.run(
["op", "signin"],
capture_output=True, text=True,
)
if result.returncode != 0:
console.print(f"[red]Failed to sign in:[/red] {result.stderr}")
sys.exit(1)
for line in result.stdout.splitlines():
# Parse: export KEY="VALUE"
if line.startswith("export "):
parts = line[7:].split("=", 1)
if len(parts) == 2:
_op_env[parts[0]] = parts[1].strip('"')
def run_op(*args):
ensure_signed_in()
result = subprocess.run(["op"] + list(args), capture_output=True, text=True, env=_op_env)
if result.returncode != 0:
console.print(f"[red]Error running op {' '.join(args)}:[/red] {result.stderr}")
sys.exit(1)
return result.stdout
def list_vaults():
out = run_op("vault", "list", "--format", "json")
return json.loads(out)
def select_vault(vault_arg):
if vault_arg:
return vault_arg
vaults = list_vaults()
if len(vaults) == 1:
name = vaults[0]["name"]
console.print(f"Using vault: [bold cyan]{name}[/bold cyan]")
return name
choice = questionary.select(
"Select vault:",
choices=[v["name"] for v in vaults],
style=Q_STYLE,
).ask()
if choice is None:
sys.exit(0)
return choice
def fetch_all_logins(vault):
out = run_op("item", "list", "--vault", vault, "--categories", "Login", "--format", "json")
return json.loads(out)
def get_item_detail(item_id):
out = run_op("item", "get", item_id, "--format", "json")
return json.loads(out)
def group_key(item):
username = (item.get("additional_information") or "").strip().lower()
urls = item.get("urls") or []
domain = ""
for u in urls:
href = u.get("href", "")
if href:
domain = extract_domain(href)
break
if not domain:
# No URL: use normalized title as part of the key so only
# items with the same title + username are grouped together.
title = item.get("title", "").strip().lower()
return ("", username, title)
return (domain, username)
def find_duplicates(items):
groups = defaultdict(list)
for item in items:
key = group_key(item)
if not key[0] and not key[1]:
continue
groups[key].append(item)
return {k: v for k, v in groups.items() if len(v) > 1}
def has_passkey(detail):
"""Check if an item detail has a passkey field."""
for f in detail.get("fields", []):
if f.get("type") == "PASSKEY":
return True
return False
def sort_items(group_items, details):
"""Sort items: passkey holders first, then by updated_at descending."""
def sort_key(item):
pk = has_passkey(details[item["id"]]) if item["id"] in details else False
return (pk, item.get("updated_at", ""))
return sorted(group_items, key=sort_key, reverse=True)
def format_item_label(item, badge=""):
urls = item.get("urls") or []
url = urls[0]["href"] if urls else "(no URL)"
if len(url) > 50:
url = url[:47] + "..."
updated = item.get("updated_at", "?")[:19].replace("T", " ")
suffix = f" ** {badge} **" if badge else ""
return f"{item['title']} | {url} | {updated}{suffix}"
def display_group(domain, username, sorted_items, details):
table = Table(
title=f"[bold]{domain or '(no domain)'}[/bold] [dim]|[/dim] [cyan]{username or '(no user)'}[/cyan]",
box=box.ROUNDED,
show_lines=True,
title_justify="left",
)
table.add_column("#", style="bold yellow", width=4, justify="center")
table.add_column("Title", style="bold white")
table.add_column("URL", style="blue", max_width=50, overflow="ellipsis")
table.add_column("Username", style="cyan")
table.add_column("Updated", style="green")
table.add_column("", width=14)
for i, item in enumerate(sorted_items):
urls = item.get("urls") or []
url = urls[0]["href"] if urls else "(no URL)"
badges = []
if item["id"] in details and has_passkey(details[item["id"]]):
badges.append("[bold magenta]passkey[/bold magenta]")
if i == 0:
badges.append("[bold green]recommended[/bold green]")
table.add_row(
str(i + 1),
item["title"],
url,
item.get("additional_information", "(none)"),
item.get("updated_at", "?")[:19].replace("T", " "),
" ".join(badges),
)
console.print()
console.print(table)
return sorted_items
def find_mergeable_fields(keeper_detail, discard_detail):
"""Find fields from the discarded item that the keeper is missing."""
mergeable = []
# Keeper field index by (purpose, label)
keeper_fields = keeper_detail.get("fields", [])
keeper_by_purpose = {}
keeper_labels = set()
for f in keeper_fields:
purpose = f.get("purpose")
if purpose:
keeper_by_purpose[purpose] = f
keeper_labels.add(f.get("label", "").lower())
for f in discard_detail.get("fields", []):
ftype = f.get("type", "")
purpose = f.get("purpose")
label = f.get("label", "")
value = f.get("value")
if not value:
continue
# Notes: merge if keeper has no notes
if purpose == "NOTES":
keeper_notes = keeper_by_purpose.get("NOTES", {}).get("value")
if not keeper_notes:
mergeable.append({"kind": "notes", "label": label, "value": value, "field": f})
continue
# TOTP: merge if keeper has none
if ftype == "OTP":
if not any(kf.get("type") == "OTP" for kf in keeper_fields):
mergeable.append({"kind": "totp", "label": label, "value": value, "field": f})
continue
# Skip standard username/password (those stay with the keeper)
if purpose in ("USERNAME", "PASSWORD"):
continue
# Custom/section fields: merge if keeper doesn't have same label
# Skip fields that just duplicate the username/email/password
keeper_username = keeper_by_purpose.get("USERNAME", {}).get("value", "")
keeper_password = keeper_by_purpose.get("PASSWORD", {}).get("value", "")
if value.lower() in (keeper_username.lower(), keeper_password) if keeper_username or keeper_password else False:
continue
discard_username = ""
discard_password = ""
for df in discard_detail.get("fields", []):
if df.get("purpose") == "USERNAME":
discard_username = df.get("value", "")
elif df.get("purpose") == "PASSWORD":
discard_password = df.get("value", "")
if value in (discard_username, discard_password):
continue
if label.lower() not in keeper_labels:
mergeable.append({"kind": "field", "label": label, "value": value, "type": ftype, "field": f})
# URLs: only merge if root domain differs from all keeper URLs
keeper_domains = {extract_domain(u.get("href", "")) for u in keeper_detail.get("urls", [])}
for u in discard_detail.get("urls", []):
href = u.get("href", "")
if href and extract_domain(href) not in keeper_domains:
mergeable.append({"kind": "url", "label": "URL", "value": href})
return mergeable
def try_op_edit(keeper_id, *args):
"""Run op item edit, returning True on success, logging warning on failure."""
ensure_signed_in()
result = subprocess.run(["op", "item", "edit", keeper_id] + list(args),
capture_output=True, text=True, env=_op_env)
if result.returncode != 0:
console.print(f" [dim red]Failed to edit: {result.stderr.strip()}[/dim red]")
return False
return True
def apply_merge(keeper_id, mergeables):
"""Apply mergeable fields to the keeper item via op item edit."""
for m in mergeables:
kind = m["kind"]
if kind == "password":
try_op_edit(keeper_id, f"password={m['value']}")
elif kind == "notes":
try_op_edit(keeper_id, f"notesPlain={m['value']}")
elif kind == "totp":
try_op_edit(keeper_id, f"--totp={m['value']}")
elif kind == "url":
try_op_edit(keeper_id, f"--url={m['value']}")
elif kind == "field":
field_label = m["label"].replace(".", "_")
try_op_edit(keeper_id, f"Merged.{field_label}={m['value']}")
def archive_item(item_id):
ensure_signed_in()
subprocess.run(["op", "item", "delete", item_id, "--archive"], capture_output=True, text=True, env=_op_env)
def get_password(detail):
for f in detail.get("fields", []):
if f.get("purpose") == "PASSWORD":
return f.get("value", "")
return ""
def apply_pending(pending_actions):
"""Apply all pending merge/archive actions to the vault."""
if not pending_actions:
return 0
archived = 0
total = sum(1 + len(a["discards"]) for a in pending_actions)
step = 0
for action in pending_actions:
keeper_id = action["keeper"]["id"]
keeper_title = action["keeper"]["title"]
mergeables = action["mergeables"]
discards = action["discards"]
if mergeables:
step += 1
with console.status(f" [{step}/{total}] Merging fields into [bold]{keeper_title}[/bold]..."):
apply_merge(keeper_id, mergeables)
console.print(f" [cyan]:sparkles: Merged {len(mergeables)} field(s) into {keeper_title}[/cyan]")
for d in discards:
step += 1
with console.status(f" [{step}/{total}] Archiving [bold]{d['title']}[/bold]..."):
archive_item(d["id"])
console.print(f" [red]:wastebasket: Archived:[/red] {d['title']} [dim]({d['id']})[/dim]")
archived += 1
return archived
def main():
parser = argparse.ArgumentParser(description="Merge duplicate logins in a 1Password vault.")
parser.add_argument("--vault", help="Vault name to use (prompts if multiple and not specified)")
args = parser.parse_args()
vault = select_vault(args.vault)
with console.status(f"[bold]Fetching logins from {vault}..."):
items = fetch_all_logins(vault)
console.print(f"Found [bold]{len(items)}[/bold] logins.")
with console.status("[bold]Finding duplicates..."):
dupes = find_duplicates(items)
if not dupes:
console.print("[green]No duplicates found.[/green]")
return
sorted_dupes = sorted(dupes.items())
console.print(f"Found [bold yellow]{len(sorted_dupes)}[/bold yellow] duplicate groups.\n")
# Phase 1: Review all groups and collect decisions
pending_actions = [] # list of (group_index, action_dict)
group_idx = 0
auto_count = 0
while group_idx < len(sorted_dupes):
key, group_items = sorted_dupes[group_idx]
domain = key[0]
username = key[1]
# Fetch full details for all items before display
details = {}
with console.status(f" Fetching details for group {group_idx + 1}/{len(sorted_dupes)}..."):
for item in group_items:
details[item["id"]] = get_item_detail(item["id"])
# Sort: passkey holders first, then newest
sorted_items = sort_items(group_items, details)
keeper = sorted_items[0]
discards = sorted_items[1:]
keeper_detail = details[keeper["id"]]
# Check for discrepancies that require human review
passwords = {item["id"]: get_password(details[item["id"]]) for item in sorted_items}
unique_pws = {pw for pw in passwords.values() if pw}
pw_match = len(unique_pws) <= 1
# Check if any discards have extra fields/TOTP/notes/passkeys the keeper lacks
has_extra = False
for d in discards:
if find_mergeable_fields(keeper_detail, details[d["id"]]):
has_extra = True
break
# Check if any discard has a passkey the keeper doesn't
keeper_has_pk = has_passkey(keeper_detail)
discard_has_pk = any(has_passkey(details[d["id"]]) for d in discards)
passkey_conflict = discard_has_pk and not keeper_has_pk
needs_review = not pw_match or has_extra or passkey_conflict
if not needs_review:
# Auto-select: keeper is clearly the best, no data loss
pending_actions = [a for a in pending_actions if a["group_idx"] != group_idx]
pending_actions.append({
"group_idx": group_idx,
"keeper": keeper,
"discards": discards,
"mergeables": [],
})
auto_count += 1
group_idx += 1
continue
# --- Interactive review ---
console.rule(f"[bold]Group {group_idx + 1}/{len(sorted_dupes)}[/bold]", style="bright_blue")
display_group(domain, username, sorted_items, details)
# Show password status
if pw_match:
console.print(f" [green]:locked: Passwords match across all items[/green]")
else:
console.print(f" [bold yellow]:warning: Passwords differ — newest password will be kept[/bold yellow]")
if passkey_conflict:
console.print(f" [bold magenta]:warning: A discarded item has a passkey the keeper lacks![/bold magenta]")
# Build choices for questionary select
choices = []
for i, item in enumerate(sorted_items):
pk = has_passkey(details[item["id"]])
badge = "passkey" if pk else ("recommended" if i == 0 else "")
label = format_item_label(item, badge=badge)
choices.append(questionary.Choice(title=f"Keep: {label}", value=i))
choices.append(questionary.Choice(title="Skip this group", value="skip"))
if group_idx > 0:
choices.append(questionary.Choice(title="Go back to previous group", value="back"))
choices.append(questionary.Choice(title="Apply selections so far & quit", value="save"))
choices.append(questionary.Choice(title="Quit without applying", value="quit"))
answer = questionary.select(
"Which item to keep?",
choices=choices,
default=choices[0],
style=Q_STYLE,
).ask()
if answer is None or answer == "quit":
console.print(f"\n[bold]Quit without applying. No changes made.[/bold]")
return
if answer == "save":
break
if answer == "back":
pending_actions = [a for a in pending_actions if a["group_idx"] != group_idx - 1]
group_idx -= 1
continue
if answer == "skip":
console.print("[dim] Skipped.[/dim]")
group_idx += 1
continue
idx = answer
keeper = sorted_items[idx]
discards = [item for i, item in enumerate(sorted_items) if i != idx]
keeper_detail = details[keeper["id"]]
# If passwords differ, queue updating keeper to the newest password
all_mergeables = []
keeper_pw = get_password(keeper_detail)
newest_pw = keeper_pw
newest_pw_date = keeper.get("updated_at", "")
for d in sorted_items:
d_pw = get_password(details[d["id"]])
if d_pw and d.get("updated_at", "") > newest_pw_date:
newest_pw = d_pw
newest_pw_date = d.get("updated_at", "")
if newest_pw and newest_pw != keeper_pw:
all_mergeables.append({"kind": "password", "label": "password", "value": newest_pw})
skip_group = False
for d in discards:
d_detail = details[d["id"]]
mergeables = find_mergeable_fields(keeper_detail, d_detail)
if mergeables:
console.print(f"\n [bold cyan]Extra fields found in '{d['title']}':[/bold cyan]")
merge_table = Table(box=box.SIMPLE, show_header=True, padding=(0, 2))
merge_table.add_column("Type", style="yellow")
merge_table.add_column("Label", style="white")
merge_table.add_column("Value", style="dim", max_width=60, overflow="ellipsis")
for m in mergeables:
val = m["value"]
if len(val) > 60:
val = val[:57] + "..."
merge_table.add_row(m["kind"].upper(), m["label"], val)
console.print(merge_table)
action = questionary.select(
f" Merge these into keeper?",
choices=[
questionary.Choice(title="Merge extra fields into keeper, then archive", value="merge"),
questionary.Choice(title="Archive without merging (discard extra fields)", value="discard"),
questionary.Choice(title="Skip this group", value="skip"),
],
style=Q_STYLE,
).ask()
if action is None or action == "skip":
console.print(" [dim]Skipping this group.[/dim]")
skip_group = True
break
if action == "merge":
all_mergeables.extend(mergeables)
if skip_group:
group_idx += 1
continue
# Remove any prior action for this group (in case we went back)
pending_actions = [a for a in pending_actions if a["group_idx"] != group_idx]
pending_actions.append({
"group_idx": group_idx,
"keeper": keeper,
"discards": discards,
"mergeables": all_mergeables,
})
console.print(f" [green]:white_check_mark: Queued:[/green] keep [bold]{keeper['title']}[/bold], archive {len(discards)}")
group_idx += 1
if auto_count:
console.print(f"\n[dim]{auto_count} group(s) auto-selected (identical passwords, no extra fields)[/dim]")
# Phase 2: Apply all pending actions
if not pending_actions:
console.print("\n[dim]No actions to apply.[/dim]")
return
console.print(f"\n[bold]Ready to apply {len(pending_actions)} group(s):[/bold]")
for a in pending_actions:
console.print(f" Keep [bold]{a['keeper']['title']}[/bold], archive {len(a['discards'])} item(s)"
+ (f", merge {len(a['mergeables'])} field(s)" if a['mergeables'] else ""))
confirm = questionary.confirm(
"\nApply all changes?", default=True, style=Q_STYLE
).ask()
if not confirm:
console.print("[dim]Cancelled. No changes made.[/dim]")
return
archived_count = apply_pending(pending_actions)
console.print(f"\n[bold green]Done.[/bold green] Archived [bold yellow]{archived_count}[/bold yellow] items total.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment