Last active
May 20, 2026 11:50
-
-
Save kebman/16f9d84a58a5e39d1725fca4cde273b9 to your computer and use it in GitHub Desktop.
smartpatch.py is a conservative fuzzy patch applier for AI-generated unified diffs.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| smartpatch.py — conservative fuzzy patch applier for AI-made unified diffs. | |
| Design goal: | |
| Treat AI diffs as intent packets, not as mechanically exact Git patches. | |
| Ignore unreliable hunk line numbers. Prefer exact content/context matches. | |
| Apply only when confidence is high enough. Report everything else. | |
| Safe default: | |
| This script does a dry run unless --write is passed. | |
| Usage: | |
| python3 ~/bin/smartpatch.py apply ai.patch | |
| python3 ~/bin/smartpatch.py apply ai.patch --write --backup | |
| python3 ~/bin/smartpatch.py apply ai.patch --report | |
| python3 ~/bin/smartpatch.py apply ai.patch --report custom-report.md | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import difflib | |
| import json | |
| import re | |
| import shutil | |
| import subprocess | |
| import sys | |
| from dataclasses import dataclass, field, asdict | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Literal | |
| DiffKind = Literal["context", "remove", "add"] | |
| MARKDOWN_EXTENSIONS = {".md", ".mdx", ".markdown"} | |
| STRICT_EXTENSIONS = {".py", ".yaml", ".yml", ".json", ".toml"} | |
| PLACEHOLDER_LINES = {"...", "[… ]", "[...]", "<...>"} | |
| @dataclass | |
| class DiffLine: | |
| kind: DiffKind | |
| text: str | |
| raw: str | |
| @dataclass | |
| class Hunk: | |
| header: str | None | |
| lines: list[DiffLine] = field(default_factory=list) | |
| parse_warnings: list[str] = field(default_factory=list) | |
| @dataclass | |
| class PatchFile: | |
| old_path: str | None = None | |
| new_path: str | None = None | |
| hunks: list[Hunk] = field(default_factory=list) | |
| parse_warnings: list[str] = field(default_factory=list) | |
| @property | |
| def target_path(self) -> str | None: | |
| return self.new_path or self.old_path | |
| @dataclass | |
| class MatchResult: | |
| found: bool | |
| start: int | None = None | |
| end: int | None = None | |
| confidence: float = 0.0 | |
| method: str = "not_found" | |
| warnings: list[str] = field(default_factory=list) | |
| missing_lines: list[str] = field(default_factory=list) | |
| candidate_count: int = 0 | |
| @dataclass | |
| class HunkReport: | |
| file: str | |
| hunk_index: int | |
| action: Literal["applied", "would_apply", "already_applied", "skipped"] | |
| confidence: float | |
| method: str | |
| start_line: int | None | |
| end_line: int | None | |
| warnings: list[str] = field(default_factory=list) | |
| missing_lines: list[str] = field(default_factory=list) | |
| details: str = "" | |
| @dataclass | |
| class FileReport: | |
| file: str | |
| exists: bool | |
| hunks_total: int | |
| applied: int = 0 | |
| would_apply: int = 0 | |
| already_applied: int = 0 | |
| skipped: int = 0 | |
| warnings: list[str] = field(default_factory=list) | |
| hunk_reports: list[HunkReport] = field(default_factory=list) | |
| @dataclass | |
| class RunReport: | |
| patch: str | |
| root: str | |
| dry_run: bool | |
| min_confidence: float | |
| files_total: int | |
| hunks_total: int | |
| applied: int = 0 | |
| would_apply: int = 0 | |
| already_applied: int = 0 | |
| skipped: int = 0 | |
| warnings: list[str] = field(default_factory=list) | |
| file_reports: list[FileReport] = field(default_factory=list) | |
| def clean_path(token: str) -> str | None: | |
| """Clean paths from diff headers. Returns None for /dev/null. | |
| Important: repo paths may contain normal spaces, especially Markdown docs like | |
| `docs/ux/Organizer Views/Bundling/Bundling View.md`. Therefore we only strip | |
| tab-separated metadata, not ordinary spaces inside the path. | |
| """ | |
| token = token.strip() | |
| if not token: | |
| return None | |
| # Git file headers usually separate optional timestamps with a tab. | |
| token = token.split("\t", 1)[0].strip() | |
| # Remove simple surrounding quotes if an AI/Git output quoted the path. | |
| if len(token) >= 2 and token[0] == token[-1] and token[0] in {"'", '"'}: | |
| token = token[1:-1] | |
| if token == "/dev/null": | |
| return None | |
| if token.startswith("a/") or token.startswith("b/"): | |
| token = token[2:] | |
| return token or None | |
| def parse_diff_git_paths(line: str) -> tuple[str | None, str | None]: | |
| """Parse `diff --git a/path b/path`, including unquoted paths with spaces. | |
| Standard Git often quotes unusual paths, but AI-written diffs may not. The | |
| least-bad recovery for unquoted paths is to split on the ` b/` marker. | |
| """ | |
| rest = line[len("diff --git ") :].strip() | |
| if rest.startswith("a/") and " b/" in rest: | |
| split_at = rest.find(" b/") | |
| return clean_path(rest[:split_at]), clean_path(rest[split_at + 1 :]) | |
| parts = rest.split(maxsplit=1) | |
| if len(parts) == 2: | |
| return clean_path(parts[0]), clean_path(parts[1]) | |
| return None, None | |
| def normalize_line(text: str) -> str: | |
| return re.sub(r"\s+", " ", text.strip()) | |
| def normalize_block(lines: list[str]) -> list[str]: | |
| return [normalize_line(line) for line in lines] | |
| def load_target_lines(root: Path, rel: str | None) -> list[str]: | |
| if not rel: | |
| return [] | |
| try: | |
| path = safe_target_path(root, rel) | |
| except Exception: | |
| return [] | |
| if not path.exists(): | |
| return [] | |
| try: | |
| return path.read_text(encoding="utf-8").splitlines() | |
| except UnicodeDecodeError: | |
| return [] | |
| def load_target_line_set(root: Path, rel: str | None) -> set[str]: | |
| return set(load_target_lines(root, rel)) | |
| def unique_target_line_match(target_lines: list[str], raw_line: str) -> str | None: | |
| """Return exact target line if raw_line matches uniquely, ignoring indentation/trailing spaces.""" | |
| raw_stripped = raw_line.strip() | |
| if not raw_stripped: | |
| return None | |
| exact = [line for line in target_lines if line == raw_line] | |
| if len(exact) == 1: | |
| return exact[0] | |
| stripped = [line for line in target_lines if line.strip() == raw_stripped] | |
| if len(stripped) == 1: | |
| return stripped[0] | |
| norm = normalize_line(raw_line) | |
| normalized = [line for line in target_lines if normalize_line(line) == norm] | |
| if len(normalized) == 1: | |
| return normalized[0] | |
| return None | |
| def looks_like_markdown_bullet(line: str) -> bool: | |
| stripped = line.lstrip() | |
| return bool(re.match(r"^([-*+]\s+|\d+[.)]\s+)", stripped)) | |
| def looks_like_markdown_horizontal_rule(line: str) -> bool: | |
| return line.strip() in {"---", "***", "___"} | |
| def markdown_bullet_match(line: str) -> re.Match[str] | None: | |
| return re.match( | |
| r"^(?P<indent>\s*)(?P<marker>[-*+]|\d+[.)])\s+(?P<body>.*?)(?P<trailing>\s*)$", | |
| line, | |
| ) | |
| def markdown_bullet_body(line: str) -> str | None: | |
| m = markdown_bullet_match(line) | |
| if not m: | |
| return None | |
| return normalize_line(m.group("body")) | |
| def markdown_bullet_marker(line: str) -> str | None: | |
| m = markdown_bullet_match(line) | |
| if not m: | |
| return None | |
| return m.group("marker") | |
| def rewrite_markdown_bullet_marker(line: str, marker: str) -> str: | |
| m = markdown_bullet_match(line) | |
| if not m: | |
| return line | |
| return f"{m.group('indent')}{marker} {m.group('body')}{m.group('trailing')}" | |
| def markdown_equiv_key(line: str) -> tuple[str, str]: | |
| body = markdown_bullet_body(line) | |
| if body is not None: | |
| return ("md_bullet", body) | |
| return ("line", normalize_line(line)) | |
| def markdown_lines_equiv(a: str, b: str) -> bool: | |
| return markdown_equiv_key(a) == markdown_equiv_key(b) | |
| def unique_markdown_bullet_target_match(target_lines: list[str], raw_line: str) -> str | None: | |
| raw_body = markdown_bullet_body(raw_line) | |
| if raw_body is None: | |
| return None | |
| matches = [ | |
| line for line in target_lines | |
| if markdown_bullet_body(line) == raw_body | |
| ] | |
| if len(matches) == 1: | |
| return matches[0] | |
| return None | |
| def find_markdown_equiv_block(file_lines: list[str], block: list[str]) -> list[tuple[int, int]]: | |
| if not block: | |
| return [] | |
| n = len(block) | |
| matches: list[tuple[int, int]] = [] | |
| for i in range(0, len(file_lines) - n + 1): | |
| window = file_lines[i : i + n] | |
| if all(markdown_lines_equiv(file_line, patch_line) for file_line, patch_line in zip(window, block)): | |
| matches.append((i, i + n)) | |
| return matches | |
| def replacement_preserving_matched_context( | |
| hunk: Hunk, | |
| matched_old_lines: list[str], | |
| path: Path, | |
| ) -> list[str]: | |
| is_markdown = path.suffix.lower() in MARKDOWN_EXTENSIONS | |
| replacement: list[str] = [] | |
| old_i = 0 | |
| preferred_bullet_marker: str | None = None | |
| if is_markdown: | |
| for line in matched_old_lines: | |
| marker = markdown_bullet_marker(line) | |
| if marker in {"-", "*", "+"}: | |
| preferred_bullet_marker = marker | |
| break | |
| for dl in hunk.lines: | |
| if dl.kind == "context": | |
| if old_i < len(matched_old_lines): | |
| replacement.append(matched_old_lines[old_i]) | |
| old_i += 1 | |
| else: | |
| replacement.append(dl.text) | |
| elif dl.kind == "remove": | |
| if old_i < len(matched_old_lines): | |
| marker = markdown_bullet_marker(matched_old_lines[old_i]) | |
| if marker in {"-", "*", "+"}: | |
| preferred_bullet_marker = marker | |
| old_i += 1 | |
| elif dl.kind == "add": | |
| line = dl.text | |
| if ( | |
| is_markdown | |
| and preferred_bullet_marker in {"-", "*", "+"} | |
| and markdown_bullet_marker(line) in {"-", "*", "+"} | |
| ): | |
| line = rewrite_markdown_bullet_marker(line, preferred_bullet_marker) | |
| replacement.append(line) | |
| return replacement | |
| def hunk_line_has_diff_marker(line: str) -> bool: | |
| return bool(line.startswith((" ", "+", "-", "\\"))) | |
| def looks_like_hunk_header(line: str) -> bool: | |
| return bool(re.match(r"^\s*@@\s+-\d", line)) | |
| def looks_like_outer_code_fence(line: str) -> bool: | |
| """Detect outer fences used to wrap a diff in Markdown. | |
| Important: | |
| - Four or more backticks are treated as wrapper fences. | |
| - Three-backtick fences are NOT ignored here because Markdown target files | |
| often legitimately contain ``` code fences inside hunks. | |
| """ | |
| return bool(re.match(r"^`{4,}(?:diff|patch)?\s*$", line.strip())) | |
| def split_attached_code_fence_context(line: str) -> list[str] | None: | |
| """Repair naked context like ```textOrganizer into two context lines.""" | |
| m = re.match(r"^```([A-Za-z0-9_-]+)(\S.*)$", line) | |
| if not m: | |
| return None | |
| lang, rest = m.groups() | |
| return [f" ```{lang}", f" {rest}"] | |
| def strip_ai_text_fence_prefix(text: str) -> str: | |
| """Strip broken AI fence prefixes from hunk content.""" | |
| for prefix in ("```text", "`text", "```"): | |
| if text.startswith(prefix): | |
| return text[len(prefix):] | |
| return text | |
| def repair_inline_text_replacement(line: str) -> list[str] | None: | |
| """Repair collapsed AI output like: | |
| -`textOld question? +`textNew question? | |
| into: | |
| -Old question? | |
| +New question? | |
| """ | |
| m = re.match(r"^-\s*`{1,3}text(?P<old>.+?)\s+\+\s*`{1,3}text(?P<new>.+)$", line) | |
| if not m: | |
| return None | |
| return [f"-{m.group('old').strip()}", f"+{m.group('new').strip()}"] | |
| def repair_attached_fence_change_pair( | |
| current: str, | |
| next_line: str | None, | |
| ) -> tuple[list[str] | None, bool]: | |
| """Repair pair like: | |
| -```textOld | |
| +```textNew | |
| into: | |
| -Old | |
| +New | |
| Returns (repaired_lines, consumed_next). | |
| """ | |
| if next_line is None: | |
| return None, False | |
| m1 = re.match(r"^-\s*```text(?P<old>.+)$", current) | |
| m2 = re.match(r"^\+\s*```text(?P<new>.+)$", next_line) | |
| if m1 and m2: | |
| return [f"-{m1.group('old').strip()}", f"+{m2.group('new').strip()}"], True | |
| return None, False | |
| def repair_single_attached_fence_change(line: str) -> str | None: | |
| """Repair single line like -```textOld or +```textNew.""" | |
| m = re.match(r"^(?P<marker>[+-])\s*```text(?P<body>.+)$", line) | |
| if not m: | |
| return None | |
| return f"{m.group('marker')}{m.group('body').strip()}" | |
| def repair_compact_markdown_bullet_change( | |
| line: str, | |
| target_lines: list[str], | |
| ) -> str | None: | |
| """Repair accidentally indented diff-marker lines. | |
| Examples: | |
| ' -* Offer list' -> '-* Offer list' | |
| ' +* Offer list' -> '+* Offer list' | |
| ' -foo' -> '-foo' | |
| ' +foo' -> '+foo' | |
| Deliberately does not touch valid column-1 diff lines. | |
| """ | |
| m = re.match(r"^\s+(?P<marker>[+-])(?P<body>\S.*)$", line) | |
| if not m: | |
| return None | |
| return f"{m.group('marker')}{m.group('body')}" | |
| def repair_probable_markdown_context_bullet( | |
| line: str, | |
| current_path: str | None, | |
| target_lines: list[str], | |
| ) -> str | None: | |
| """Repair naked Markdown list context. | |
| Examples: | |
| '* public links' | |
| '- public links' | |
| '+ public links' | |
| If that list item exists uniquely in the target file, treat it as context. | |
| Marker style may differ between patch and target. | |
| """ | |
| if not current_path or Path(current_path).suffix.lower() not in MARKDOWN_EXTENSIONS: | |
| return None | |
| if not looks_like_markdown_bullet(line): | |
| return None | |
| target_match = unique_target_line_match(target_lines, line) | |
| if target_match and looks_like_markdown_bullet(target_match): | |
| return " " + target_match | |
| target_match = unique_markdown_bullet_target_match(target_lines, line) | |
| if target_match: | |
| return " " + target_match | |
| return None | |
| def repair_parsed_markdown_context_removals( | |
| hunk: Hunk, | |
| file_lines: list[str], | |
| path: Path, | |
| ) -> tuple[Hunk, list[str]]: | |
| """Repair parsed Markdown bullets that are probably context, not removals. | |
| Some malformed AI diffs emit normal Markdown list context as: | |
| - browse-first customer navigation | |
| instead of valid context form: | |
| - browse-first customer navigation | |
| If the parsed removal is a Markdown bullet that already exists uniquely in | |
| the target file, treat it as context. This lets add-only insertion logic run | |
| instead of falling through to fuzzy_window. | |
| """ | |
| if path.suffix.lower() not in MARKDOWN_EXTENSIONS: | |
| return hunk, [] | |
| changed = False | |
| warnings: list[str] = [] | |
| repaired_lines: list[DiffLine] = [] | |
| for dl in hunk.lines: | |
| if dl.kind != "remove" or not looks_like_markdown_bullet(dl.text): | |
| repaired_lines.append(dl) | |
| continue | |
| target_match = unique_target_line_match(file_lines, dl.text) | |
| if not target_match: | |
| target_match = unique_markdown_bullet_target_match(file_lines, dl.text) | |
| if target_match and looks_like_markdown_bullet(target_match): | |
| repaired_lines.append(DiffLine("context", target_match, " " + target_match)) | |
| changed = True | |
| warnings.append( | |
| f"Parsed Markdown bullet removal reclassified as context: {dl.text}" | |
| ) | |
| else: | |
| repaired_lines.append(dl) | |
| if not changed: | |
| return hunk, [] | |
| return Hunk( | |
| header=hunk.header, | |
| lines=repaired_lines, | |
| parse_warnings=list(hunk.parse_warnings), | |
| ), warnings | |
| def repair_ai_hunk_lines( | |
| hunk_lines: list[tuple[int, str]], | |
| current_path: str | None, | |
| target_lines: list[str], | |
| warnings: list[str], | |
| ) -> list[str]: | |
| """Repair one hunk before parse_patch sees it.""" | |
| is_markdown = bool( | |
| current_path and Path(current_path).suffix.lower() in MARKDOWN_EXTENSIONS | |
| ) | |
| repaired: list[str] = [] | |
| i = 0 | |
| while i < len(hunk_lines): | |
| idx, line = hunk_lines[i] | |
| next_line = hunk_lines[i + 1][1] if i + 1 < len(hunk_lines) else None | |
| # ChatGPT wrapping artifact: a line containing only spaces inside a hunk. | |
| if line and line.strip() == "": | |
| warnings.append(f"Line {idx}: dropped whitespace-only hunk artifact line") | |
| i += 1 | |
| continue | |
| # Bare Markdown horizontal rule. Must be context, not a diff removal. | |
| if is_markdown and looks_like_markdown_horizontal_rule(line): | |
| repaired.append(" " + line.strip()) | |
| warnings.append(f"Line {idx}: repaired Markdown horizontal rule as context") | |
| i += 1 | |
| continue | |
| inline = repair_inline_text_replacement(line) | |
| if inline: | |
| repaired.extend(inline) | |
| warnings.append(f"Line {idx}: repaired collapsed inline text replacement") | |
| i += 1 | |
| continue | |
| pair, consumed_next = repair_attached_fence_change_pair(line, next_line) | |
| if pair: | |
| repaired.extend(pair) | |
| warnings.append(f"Line {idx}: repaired attached ```text replacement pair") | |
| i += 2 if consumed_next else 1 | |
| continue | |
| single_fence = repair_single_attached_fence_change(line) | |
| if single_fence: | |
| repaired.append(single_fence) | |
| warnings.append(f"Line {idx}: repaired attached ```text change line") | |
| i += 1 | |
| continue | |
| compact_bullet = repair_compact_markdown_bullet_change(line, target_lines) | |
| if compact_bullet: | |
| repaired.append(compact_bullet) | |
| warnings.append(f"Line {idx}: repaired indented diff marker") | |
| i += 1 | |
| continue | |
| context_bullet = repair_probable_markdown_context_bullet(line, current_path, target_lines) | |
| if context_bullet: | |
| repaired.append(context_bullet) | |
| warnings.append( | |
| f"Line {idx}: repaired probable Markdown bullet context line in {current_path}" | |
| ) | |
| i += 1 | |
| continue | |
| if not hunk_line_has_diff_marker(line): | |
| split_context = split_attached_code_fence_context(line) | |
| if split_context: | |
| repaired.extend(split_context) | |
| warnings.append(f"Line {idx}: repaired attached Markdown code-fence context line") | |
| else: | |
| repaired.append(" " + line) | |
| if line: | |
| warnings.append(f"Line {idx}: repaired naked hunk line as context") | |
| else: | |
| warnings.append(f"Line {idx}: repaired naked blank hunk line as context") | |
| i += 1 | |
| continue | |
| repaired.append(line) | |
| i += 1 | |
| return repaired | |
| def repair_ai_patch_text(patch_text: str, root: Path) -> tuple[str, list[str]]: | |
| """Repair common AI diff formatting mistakes before parsing.""" | |
| out: list[str] = [] | |
| warnings: list[str] = [] | |
| in_hunk = False | |
| current_path: str | None = None | |
| target_lines: list[str] = [] | |
| hunk_buffer: list[tuple[int, str]] = [] | |
| def flush_hunk() -> None: | |
| nonlocal hunk_buffer | |
| if hunk_buffer: | |
| out.extend( | |
| repair_ai_hunk_lines( | |
| hunk_lines=hunk_buffer, | |
| current_path=current_path, | |
| target_lines=target_lines, | |
| warnings=warnings, | |
| ) | |
| ) | |
| hunk_buffer = [] | |
| for idx, line in enumerate(patch_text.splitlines(), start=1): | |
| # Ignore outer Markdown fences used to wrap the diff. Four or more | |
| # backticks are wrapper fences; normal triple-backtick fences may be | |
| # real Markdown content and must stay. | |
| if looks_like_outer_code_fence(line): | |
| flush_hunk() | |
| warnings.append(f"Line {idx}: ignored outer Markdown code fence") | |
| continue | |
| # AI sometimes indents hunk headers. Promote them back to real headers. | |
| if looks_like_hunk_header(line): | |
| flush_hunk() | |
| stripped = line.strip() | |
| if stripped != line: | |
| warnings.append(f"Line {idx}: repaired indented hunk header") | |
| in_hunk = True | |
| out.append(stripped) | |
| continue | |
| if line.startswith("diff --git "): | |
| flush_hunk() | |
| old_path, new_path = parse_diff_git_paths(line) | |
| current_path = new_path or old_path | |
| target_lines = load_target_lines(root, current_path) | |
| in_hunk = False | |
| out.append(line) | |
| continue | |
| if line.startswith("--- "): | |
| flush_hunk() | |
| old_path = clean_path(line[4:]) | |
| if current_path is None: | |
| current_path = old_path | |
| target_lines = load_target_lines(root, current_path) | |
| in_hunk = False | |
| out.append(line) | |
| continue | |
| if line.startswith("+++ "): | |
| flush_hunk() | |
| new_path = clean_path(line[4:]) | |
| current_path = new_path or current_path | |
| target_lines = load_target_lines(root, current_path) | |
| in_hunk = False | |
| out.append(line) | |
| continue | |
| if line.startswith("@@"): | |
| flush_hunk() | |
| in_hunk = True | |
| out.append(line) | |
| continue | |
| if in_hunk: | |
| hunk_buffer.append((idx, line)) | |
| else: | |
| out.append(line) | |
| flush_hunk() | |
| return "\n".join(out) + "\n", warnings | |
| def strip_one_diff_marker(line: str) -> tuple[str, str] | None: | |
| """ | |
| Strip exactly one unified-diff marker from a hunk line. | |
| Important Markdown behavior: | |
| raw: '- - bullet' -> kind remove, text '- bullet' after optional post-marker space normalization. | |
| raw: '+ - bullet' -> kind add, text '- bullet'. | |
| raw: ' - bullet' -> kind context,text '- bullet'. | |
| We only call this while inside a parsed hunk. | |
| """ | |
| if not line: | |
| return None | |
| marker = line[0] | |
| if marker not in {"+", "-", " "}: | |
| return None | |
| text = line[1:] | |
| # AI diffs commonly include a readability space after +/-. In real unified | |
| # diffs that space is part of content, but for Markdown bullets we want: | |
| # '- - item' -> '- item', not ' - item'. | |
| # Keep leading spaces for indented code except in the specific marker-space-bullet case. | |
| if marker in {"+", "-"} and text.startswith(" - "): | |
| text = text[1:] | |
| elif marker in {"+", "-"} and text.startswith(" * "): | |
| text = text[1:] | |
| elif marker in {"+", "-"} and text.startswith(" + "): | |
| text = text[1:] | |
| elif marker in {"+", "-"} and text.startswith(" ") and not text.startswith(" "): | |
| # General AI convenience form: '- old line' means content 'old line'. | |
| # This is deliberately not applied to double-space indentation. | |
| text = text[1:] | |
| elif marker == " " and text.startswith(" - "): | |
| text = text[1:] | |
| elif marker == " " and text.startswith(" * "): | |
| text = text[1:] | |
| elif marker == " " and text.startswith(" + "): | |
| text = text[1:] | |
| kind: str | |
| if marker == "+": | |
| kind = "add" | |
| elif marker == "-": | |
| kind = "remove" | |
| else: | |
| kind = "context" | |
| return kind, text | |
| def parse_patch(patch_text: str) -> tuple[list[PatchFile], list[str]]: | |
| warnings: list[str] = [] | |
| files: list[PatchFile] = [] | |
| current: PatchFile | None = None | |
| current_hunk: Hunk | None = None | |
| in_hunk = False | |
| def finish_empty_file_if_needed() -> None: | |
| nonlocal current | |
| if current and (current.hunks or current.old_path or current.new_path): | |
| if current not in files: | |
| files.append(current) | |
| def start_file(old_path: str | None = None, new_path: str | None = None) -> PatchFile: | |
| nonlocal current, current_hunk, in_hunk | |
| finish_empty_file_if_needed() | |
| current = PatchFile(old_path=old_path, new_path=new_path) | |
| current_hunk = None | |
| in_hunk = False | |
| return current | |
| lines = patch_text.splitlines() | |
| for idx, line in enumerate(lines, start=1): | |
| stripped = line.strip() | |
| # Ignore outer fenced-code boundaries if someone saved a markdown reply as a patch. | |
| # Do not ignore them inside hunks: Markdown files often contain real ``` fences. | |
| if not in_hunk and stripped.startswith("```"): | |
| continue | |
| if not in_hunk and stripped in {"*** Begin Patch", "*** End Patch"}: | |
| continue | |
| if line.startswith("diff --git "): | |
| old_path, new_path = parse_diff_git_paths(line) | |
| start_file(old_path=old_path, new_path=new_path) | |
| continue | |
| # Support common AI/apply_patch style file markers enough to recover the path. | |
| m = re.match(r"^\*\*\*\s+(Update|Add|Delete) File:\s+(.+)$", line) | |
| if m: | |
| path = clean_path(m.group(2)) | |
| if m.group(1) == "Add": | |
| start_file(old_path=None, new_path=path) | |
| elif m.group(1) == "Delete": | |
| start_file(old_path=path, new_path=None) | |
| else: | |
| start_file(old_path=path, new_path=path) | |
| continue | |
| if line.startswith("--- "): | |
| path = clean_path(line[4:]) | |
| if current is None or current.hunks: | |
| current = start_file(old_path=path) | |
| else: | |
| current.old_path = path | |
| in_hunk = False | |
| current_hunk = None | |
| continue | |
| if line.startswith("+++ "): | |
| path = clean_path(line[4:]) | |
| if current is None: | |
| current = start_file(new_path=path) | |
| else: | |
| current.new_path = path | |
| in_hunk = False | |
| current_hunk = None | |
| continue | |
| if line.startswith("@@"): | |
| if current is None: | |
| current = start_file() | |
| current.parse_warnings.append(f"Line {idx}: hunk found before file path") | |
| current_hunk = Hunk(header=line) | |
| current.hunks.append(current_hunk) | |
| in_hunk = True | |
| continue | |
| if line.startswith("\\ No newline at end of file"): | |
| continue | |
| if in_hunk and current_hunk is not None: | |
| if line.startswith("+++ ") or line.startswith("--- "): | |
| # Defensive; file headers should already have reset hunk state. | |
| # Bare `---` is a valid Markdown horizontal rule and must remain hunk content. | |
| current_hunk.parse_warnings.append(f"Line {idx}: header-looking line inside hunk: {line}") | |
| in_hunk = False | |
| current_hunk = None | |
| continue | |
| parsed = strip_one_diff_marker(line) | |
| if parsed is None: | |
| # AI often emits unmarked hunk context lines. Treat as context, but warn. | |
| current_hunk.lines.append(DiffLine("context", line, line)) | |
| if line: | |
| current_hunk.parse_warnings.append( | |
| f"Line {idx}: unmarked line inside hunk treated as context" | |
| ) | |
| else: | |
| current_hunk.parse_warnings.append( | |
| f"Line {idx}: blank unmarked line inside hunk treated as context" | |
| ) | |
| else: | |
| kind, text = parsed | |
| current_hunk.lines.append(DiffLine(kind, text, line)) | |
| continue | |
| # Outside hunks, do nothing. Important: '-' outside a hunk is never deletion. | |
| finish_empty_file_if_needed() | |
| # Drop file entries with no hunks; they are not actionable for this v1. | |
| actionable = [pf for pf in files if pf.hunks] | |
| for pf in actionable: | |
| if not pf.target_path: | |
| pf.parse_warnings.append("No target path found for file diff") | |
| if not actionable: | |
| warnings.append("No actionable hunks found. Is this a unified diff or AI patch?") | |
| for pf in actionable: | |
| pf.hunks = split_large_ai_hunks(pf.hunks) | |
| return actionable, warnings | |
| def old_new_blocks(hunk: Hunk) -> tuple[list[str], list[str], list[str], list[str], list[str]]: | |
| """ | |
| Returns: | |
| old_block: context + removed lines | |
| new_block: context + added lines | |
| removed_only | |
| added_only | |
| context_only | |
| """ | |
| old_block: list[str] = [] | |
| new_block: list[str] = [] | |
| removed_only: list[str] = [] | |
| added_only: list[str] = [] | |
| context_only: list[str] = [] | |
| for dl in hunk.lines: | |
| if dl.kind == "context": | |
| old_block.append(dl.text) | |
| new_block.append(dl.text) | |
| context_only.append(dl.text) | |
| elif dl.kind == "remove": | |
| old_block.append(dl.text) | |
| removed_only.append(dl.text) | |
| elif dl.kind == "add": | |
| new_block.append(dl.text) | |
| added_only.append(dl.text) | |
| return old_block, new_block, removed_only, added_only, context_only | |
| def split_large_ai_hunk(hunk: Hunk, max_hunk_lines: int = 24, context_radius: int = 1) -> list[Hunk]: | |
| """Split AI hunks into smaller change groups. | |
| AI diffs often bundle multiple simple edits into one hunk. A human applies | |
| these line-by-line; smartpatch should also avoid letting one stale line poison | |
| the whole hunk. | |
| This now splits any hunk that has multiple separated change groups, not only | |
| very large hunks. | |
| """ | |
| change_indexes = [ | |
| i for i, dl in enumerate(hunk.lines) | |
| if dl.kind in {"add", "remove"} | |
| ] | |
| if len(change_indexes) <= 1: | |
| return [hunk] | |
| ranges: list[tuple[int, int]] = [] | |
| run_start = change_indexes[0] | |
| run_end = change_indexes[0] | |
| for idx in change_indexes[1:]: | |
| # Keep directly adjacent remove/add lines together. | |
| # Split when there is meaningful context between edits. | |
| if idx - run_end <= context_radius + 1: | |
| run_end = idx | |
| else: | |
| start = max(0, run_start - context_radius) | |
| end = min(len(hunk.lines), run_end + context_radius + 1) | |
| ranges.append((start, end)) | |
| run_start = idx | |
| run_end = idx | |
| start = max(0, run_start - context_radius) | |
| end = min(len(hunk.lines), run_end + context_radius + 1) | |
| ranges.append((start, end)) | |
| merged: list[tuple[int, int]] = [] | |
| for start, end in ranges: | |
| if not merged or start > merged[-1][1]: | |
| merged.append((start, end)) | |
| else: | |
| prev_start, prev_end = merged[-1] | |
| merged[-1] = (prev_start, max(prev_end, end)) | |
| if len(merged) <= 1: | |
| return [hunk] | |
| split_hunks: list[Hunk] = [] | |
| for part_index, (start, end) in enumerate(merged, start=1): | |
| part_lines = hunk.lines[start:end] | |
| if not any(dl.kind in {"add", "remove"} for dl in part_lines): | |
| continue | |
| split_hunks.append( | |
| Hunk( | |
| header=f"{hunk.header or '@@'} [smartpatch split {part_index}/{len(merged)}]", | |
| lines=part_lines, | |
| parse_warnings=list(hunk.parse_warnings) | |
| + [f"AI hunk split into {len(merged)} smaller change group(s)."], | |
| ) | |
| ) | |
| return split_hunks or [hunk] | |
| def split_large_ai_hunks(hunks: list[Hunk]) -> list[Hunk]: | |
| out: list[Hunk] = [] | |
| for hunk in hunks: | |
| out.extend(split_large_ai_hunk(hunk)) | |
| return out | |
| def find_exact_block(file_lines: list[str], block: list[str]) -> list[tuple[int, int]]: | |
| if not block: | |
| return [] | |
| n = len(block) | |
| matches: list[tuple[int, int]] = [] | |
| for i in range(0, len(file_lines) - n + 1): | |
| if file_lines[i : i + n] == block: | |
| matches.append((i, i + n)) | |
| return matches | |
| def find_normalized_block(file_lines: list[str], block: list[str]) -> list[tuple[int, int]]: | |
| if not block: | |
| return [] | |
| norm_file = normalize_block(file_lines) | |
| norm_block = normalize_block(block) | |
| n = len(norm_block) | |
| matches: list[tuple[int, int]] = [] | |
| for i in range(0, len(norm_file) - n + 1): | |
| if norm_file[i : i + n] == norm_block: | |
| matches.append((i, i + n)) | |
| return matches | |
| def is_placeholder(line: str) -> bool: | |
| stripped = line.strip() | |
| return stripped in PLACEHOLDER_LINES or stripped in {"// ...", "# ...", "<!-- ... -->"} | |
| def markdown_bullet_variants(line: str) -> list[str]: | |
| """Fallback for AI-sloppy Markdown diffs that omit the real bullet marker.""" | |
| variants = [line] | |
| stripped = line.lstrip() | |
| leading = line[: len(line) - len(stripped)] | |
| bullet_re = re.compile(r"^([-*+]\s+|\d+[.)]\s+)") | |
| if not bullet_re.match(stripped): | |
| variants.extend([ | |
| f"{leading}- {stripped}", | |
| f"{leading}* {stripped}", | |
| ]) | |
| return list(dict.fromkeys(variants)) | |
| def expand_markdown_old_block_variants(block: list[str], path: Path, enable: bool) -> list[list[str]]: | |
| if not enable or path.suffix.lower() not in MARKDOWN_EXTENSIONS or not block: | |
| return [block] | |
| # Keep this conservative: only produce variants when one or two lines are involved. | |
| # Larger combinatorial variants are risky and noisy. | |
| if len(block) > 3: | |
| return [block] | |
| variants: list[list[str]] = [[]] | |
| for line in block: | |
| line_variants = markdown_bullet_variants(line) | |
| variants = [prefix + [v] for prefix in variants for v in line_variants] | |
| unique: list[list[str]] = [] | |
| seen: set[tuple[str, ...]] = set() | |
| for v in variants: | |
| key = tuple(v) | |
| if key not in seen: | |
| unique.append(v) | |
| seen.add(key) | |
| return unique | |
| def tail_anchor(line: str, words: int = 8) -> str | None: | |
| parts = re.findall(r"\S+", line.strip()) | |
| if len(parts) < 4: | |
| return None | |
| return " ".join(parts[-words:]) | |
| def fuzzy_window_candidates(file_lines: list[str], old_block: list[str]) -> list[tuple[int, int, float]]: | |
| """Conservative fuzzy fallback over same-sized nearby windows.""" | |
| if not old_block: | |
| return [] | |
| if len(old_block) > 30: | |
| return [] | |
| target = "\n".join(normalize_block(old_block)) | |
| n = len(old_block) | |
| candidates: list[tuple[int, int, float]] = [] | |
| # Allow a small window-size wiggle for AI context drift. | |
| for size in range(max(1, n - 2), min(len(file_lines), n + 2) + 1): | |
| for i in range(0, len(file_lines) - size + 1): | |
| window = "\n".join(normalize_block(file_lines[i : i + size])) | |
| score = difflib.SequenceMatcher(None, target, window).ratio() | |
| if score >= 0.82: | |
| candidates.append((i, i + size, score)) | |
| candidates.sort(key=lambda x: x[2], reverse=True) | |
| return candidates[:10] | |
| def parse_hunk_old_start(header: str | None) -> int | None: | |
| """Return 0-based old-file start line from a unified diff hunk header. | |
| Example: | |
| @@ -34,14 +40,43 @@ | |
| -> 33 | |
| Hunk line numbers are not trusted as primary evidence, but they are useful | |
| as a tie-breaker when fuzzy candidates are otherwise equally strong. | |
| """ | |
| if not header: | |
| return None | |
| m = re.search(r"@@\s+-(?P<start>\d+)", header) | |
| if not m: | |
| return None | |
| return max(0, int(m.group("start")) - 1) | |
| def hunk_header_context_text(header: str | None) -> str | None: | |
| """Return trailing context text from a unified diff hunk header. | |
| Example: | |
| @@ -210,7 +284,7 @@ The customer should not need... | |
| -> "The customer should not need..." | |
| This is weak evidence, but useful as a bounded anchor for malformed AI | |
| tail hunks that have no explicit context lines left after repair/splitting. | |
| """ | |
| if not header: | |
| return None | |
| parts = header.split("@@", 2) | |
| if len(parts) < 3: | |
| return None | |
| text = parts[2].strip() | |
| if not text: | |
| return None | |
| # Remove smartpatch split suffix if present. | |
| text = re.sub(r"\s+\[smartpatch split \d+/\d+\]\s*$", "", text).strip() | |
| return text or None | |
| def header_guided_fuzzy_candidate( | |
| candidates: list[tuple[int, int, float]], | |
| hunk: Hunk, | |
| score_floor: float = 0.94, | |
| max_distance: int = 120, | |
| ) -> tuple[int, int, float] | None: | |
| """Choose a fuzzy candidate near the hunk header line. | |
| Used when fuzzy candidates are strong but not unique. This mirrors how a | |
| human uses the hunk header as a rough locality hint after content matching | |
| has narrowed the candidates. | |
| Conservative limits: | |
| - require high score | |
| - require parsed old-start line | |
| - require candidate close to old-start | |
| - for merely good scores, require unique nearest | |
| - for near-perfect scores, nearest candidate is enough | |
| """ | |
| if not candidates: | |
| return None | |
| old_start = parse_hunk_old_start(hunk.header) | |
| if old_start is None: | |
| return None | |
| best_score = max(score for _start, _end, score in candidates) | |
| if best_score < score_floor: | |
| return None | |
| close = [c for c in candidates if best_score - c[2] < 0.03] | |
| if not close: | |
| return None | |
| ranked = sorted(close, key=lambda c: abs(c[0] - old_start)) | |
| best = ranked[0] | |
| best_distance = abs(best[0] - old_start) | |
| if best_distance > max_distance: | |
| return None | |
| # Near-perfect duplicate fuzzy windows are usually repeated Markdown | |
| # fragments. The hunk header is a good enough tie-breaker if it selects the | |
| # closest candidate. | |
| if best[2] >= 0.99: | |
| return best | |
| # For lower scores, require a uniquely nearest candidate. | |
| if len(ranked) > 1: | |
| second_distance = abs(ranked[1][0] - old_start) | |
| if second_distance == best_distance: | |
| return None | |
| return best | |
| def line_presence_missing(file_lines: list[str], expected_lines: list[str], path: Path, markdown_recovery: bool) -> list[str]: | |
| norm_file = set(normalize_block(file_lines)) | |
| missing: list[str] = [] | |
| for line in expected_lines: | |
| if is_placeholder(line): | |
| continue | |
| variants = [line] | |
| if markdown_recovery and path.suffix.lower() in MARKDOWN_EXTENSIONS: | |
| variants = markdown_bullet_variants(line) | |
| if not any(normalize_line(v) in norm_file for v in variants): | |
| missing.append(line) | |
| return missing | |
| def markdown_or_normal_similarity(a: str, b: str, path: Path) -> float: | |
| """Similarity for one line, treating Markdown bullet bodies as comparable.""" | |
| if path.suffix.lower() in MARKDOWN_EXTENSIONS and "markdown_bullet_body" in globals(): | |
| ab = markdown_bullet_body(a) | |
| bb = markdown_bullet_body(b) | |
| if ab is not None and bb is not None: | |
| return difflib.SequenceMatcher(None, ab, bb).ratio() | |
| return difflib.SequenceMatcher(None, normalize_line(a), normalize_line(b)).ratio() | |
| def find_unique_fuzzy_line( | |
| file_lines: list[str], | |
| target_line: str, | |
| path: Path, | |
| min_score: float = 0.86, | |
| ) -> tuple[int, float] | None: | |
| """Find one unique near-match for a stale old line.""" | |
| candidates: list[tuple[int, float]] = [] | |
| for i, line in enumerate(file_lines): | |
| score = markdown_or_normal_similarity(line, target_line, path) | |
| if score >= min_score: | |
| candidates.append((i, score)) | |
| if not candidates: | |
| return None | |
| candidates.sort(key=lambda item: item[1], reverse=True) | |
| best_i, best_score = candidates[0] | |
| close = [c for c in candidates if best_score - c[1] < 0.03] | |
| if len(close) == 1: | |
| return best_i, best_score | |
| return None | |
| def replacement_preserving_single_line_style( | |
| added_lines: list[str], | |
| matched_old_line: str, | |
| path: Path, | |
| ) -> list[str]: | |
| """Preserve target Markdown bullet marker for line-replacement atoms.""" | |
| if path.suffix.lower() not in MARKDOWN_EXTENSIONS: | |
| return added_lines | |
| if "markdown_bullet_marker" not in globals(): | |
| return added_lines | |
| old_marker = markdown_bullet_marker(matched_old_line) | |
| if old_marker not in {"-", "*", "+"}: | |
| return added_lines | |
| out: list[str] = [] | |
| for line in added_lines: | |
| marker = markdown_bullet_marker(line) | |
| if marker in {"-", "*", "+"}: | |
| out.append(rewrite_markdown_bullet_marker(line, old_marker)) | |
| else: | |
| out.append(line) | |
| return out | |
| def find_unique_context_anchor( | |
| file_lines: list[str], | |
| anchor: str, | |
| path: Path, | |
| ) -> tuple[int, str] | None: | |
| """Find exact/normalized/markdown-equivalent/fuzzy context anchor.""" | |
| exact = [i for i, line in enumerate(file_lines) if line == anchor] | |
| if len(exact) == 1: | |
| return exact[0], "exact" | |
| norm_anchor = normalize_line(anchor) | |
| normalized = [i for i, line in enumerate(file_lines) if normalize_line(line) == norm_anchor] | |
| if len(normalized) == 1: | |
| return normalized[0], "normalized" | |
| if path.suffix.lower() in MARKDOWN_EXTENSIONS and "markdown_lines_equiv" in globals(): | |
| md = [i for i, line in enumerate(file_lines) if markdown_lines_equiv(line, anchor)] | |
| if len(md) == 1: | |
| return md[0], "markdown_equiv" | |
| fuzzy = find_unique_fuzzy_line(file_lines, anchor, path, min_score=0.90) | |
| if fuzzy: | |
| return fuzzy[0], "fuzzy" | |
| return None | |
| def block_already_matches( | |
| current: list[str], | |
| expected: list[str], | |
| path: Path, | |
| ) -> bool: | |
| if current == expected: | |
| return True | |
| if normalize_block(current) == normalize_block(expected): | |
| return True | |
| if ( | |
| path.suffix.lower() in MARKDOWN_EXTENSIONS | |
| and "markdown_lines_equiv" in globals() | |
| and len(current) == len(expected) | |
| and all(markdown_lines_equiv(a, b) for a, b in zip(current, expected)) | |
| ): | |
| return True | |
| return False | |
| def infer_local_bullet_marker(lines: list[str], path: Path) -> str | None: | |
| if path.suffix.lower() not in MARKDOWN_EXTENSIONS: | |
| return None | |
| if "markdown_bullet_marker" not in globals(): | |
| return None | |
| for line in lines: | |
| marker = markdown_bullet_marker(line) | |
| if marker in {"-", "*", "+"}: | |
| return marker | |
| return None | |
| def rewrite_added_bullets_to_local_style( | |
| lines: list[str], | |
| marker: str | None, | |
| path: Path, | |
| ) -> list[str]: | |
| if not marker or path.suffix.lower() not in MARKDOWN_EXTENSIONS: | |
| return lines | |
| if "markdown_bullet_marker" not in globals(): | |
| return lines | |
| out: list[str] = [] | |
| for line in lines: | |
| own_marker = markdown_bullet_marker(line) | |
| if own_marker in {"-", "*", "+"}: | |
| out.append(rewrite_markdown_bullet_marker(line, marker)) | |
| else: | |
| out.append(line) | |
| return out | |
| def hunk_change_bounds(hunk: Hunk) -> tuple[int, int] | None: | |
| indexes = [i for i, dl in enumerate(hunk.lines) if dl.kind in {"add", "remove"}] | |
| if not indexes: | |
| return None | |
| return indexes[0], indexes[-1] | |
| def nearest_context_before(hunk: Hunk, index: int) -> tuple[int, str] | None: | |
| for i in range(index - 1, -1, -1): | |
| dl = hunk.lines[i] | |
| if dl.kind == "context" and dl.text.strip(): | |
| return i, dl.text | |
| return None | |
| def nearest_context_after(hunk: Hunk, index: int) -> tuple[int, str] | None: | |
| for i in range(index + 1, len(hunk.lines)): | |
| dl = hunk.lines[i] | |
| if dl.kind == "context" and dl.text.strip(): | |
| return i, dl.text | |
| return None | |
| def hunk_new_region_lines( | |
| hunk: Hunk, | |
| start: int, | |
| end: int, | |
| path: Path, | |
| preferred_marker: str | None, | |
| ) -> list[str]: | |
| """Return the new version of hunk.lines[start:end+1]. | |
| Removes deleted lines, keeps context, keeps additions. | |
| """ | |
| out: list[str] = [] | |
| for dl in hunk.lines[start : end + 1]: | |
| if dl.kind == "remove": | |
| continue | |
| if dl.kind in {"context", "add"}: | |
| out.append(dl.text) | |
| return rewrite_added_bullets_to_local_style(out, preferred_marker, path) | |
| def locate_context_bracket_atom( | |
| file_lines: list[str], | |
| hunk: Hunk, | |
| path: Path, | |
| warnings: list[str], | |
| ) -> tuple[MatchResult, list[str]] | None: | |
| """Replace the changed hunk region between two unique context anchors.""" | |
| bounds = hunk_change_bounds(hunk) | |
| if bounds is None: | |
| return None | |
| first_change, last_change = bounds | |
| prev_ctx = nearest_context_before(hunk, first_change) | |
| next_ctx = nearest_context_after(hunk, last_change) | |
| if not prev_ctx or not next_ctx: | |
| return None | |
| _, prev_text = prev_ctx | |
| _, next_text = next_ctx | |
| found_prev = find_unique_context_anchor(file_lines, prev_text, path) | |
| found_next = find_unique_context_anchor(file_lines, next_text, path) | |
| if not found_prev or not found_next: | |
| return None | |
| prev_line, prev_method = found_prev | |
| next_line, next_method = found_next | |
| if prev_line >= next_line: | |
| return None | |
| replace_start = prev_line + 1 | |
| replace_end = next_line | |
| existing_span = replace_end - replace_start | |
| if existing_span > 80: | |
| warnings.append( | |
| f"Context-bracket span too large ({existing_span} lines); skipped atom strategy." | |
| ) | |
| return None | |
| local_marker = infer_local_bullet_marker( | |
| file_lines[max(0, prev_line - 3) : min(len(file_lines), next_line + 4)], | |
| path, | |
| ) | |
| replacement = hunk_new_region_lines( | |
| hunk=hunk, | |
| start=first_change, | |
| end=last_change, | |
| path=path, | |
| preferred_marker=local_marker, | |
| ) | |
| if len(replacement) > 140: | |
| warnings.append( | |
| f"Context-bracket replacement too large ({len(replacement)} lines); skipped atom strategy." | |
| ) | |
| return None | |
| current_inner = file_lines[replace_start:replace_end] | |
| if block_already_matches(current_inner, replacement, path): | |
| return MatchResult( | |
| found=False, | |
| confidence=0.96, | |
| method="already_applied_context_bracket_atom", | |
| warnings=warnings + [ | |
| f"Context-bracket atom already matches between unique anchors ({prev_method}/{next_method})." | |
| ], | |
| missing_lines=[], | |
| candidate_count=1, | |
| ), replacement | |
| return MatchResult( | |
| True, | |
| replace_start, | |
| replace_end, | |
| 0.93, | |
| f"context_bracket_atom_{prev_method}_{next_method}", | |
| warnings + [ | |
| f"Context-bracket atom used between unique anchors ({prev_method}/{next_method})." | |
| ], | |
| [], | |
| 1, | |
| ), replacement | |
| def find_best_fuzzy_line_in_range( | |
| file_lines: list[str], | |
| target_line: str, | |
| path: Path, | |
| start: int, | |
| end: int, | |
| min_score: float = 0.74, | |
| ) -> tuple[int, float] | None: | |
| """Find a unique fuzzy line inside a bounded local range.""" | |
| start = max(0, start) | |
| end = min(len(file_lines), end) | |
| candidates: list[tuple[int, float]] = [] | |
| for i in range(start, end): | |
| score = markdown_or_normal_similarity(file_lines[i], target_line, path) | |
| if score >= min_score: | |
| candidates.append((i, score)) | |
| if not candidates: | |
| return None | |
| candidates.sort(key=lambda item: item[1], reverse=True) | |
| best_i, best_score = candidates[0] | |
| close = [c for c in candidates if best_score - c[1] < 0.03] | |
| if len(close) == 1: | |
| return best_i, best_score | |
| return None | |
| def is_numbered_markdown_item(line: str) -> bool: | |
| return bool(re.match(r"^\s*\d+[.)]\s+", line.strip())) | |
| def text_token_set(lines: list[str]) -> set[str]: | |
| """Small token set for stale paragraph similarity checks.""" | |
| text = " ".join(lines).lower() | |
| tokens = set(re.findall(r"[a-z][a-z0-9_-]{2,}", text)) | |
| stop = { | |
| "the", "and", "for", "that", "this", "with", "from", "into", "they", | |
| "must", "not", "may", "still", "should", "one", "item", "entry", | |
| "link", "links", "code", "codes", | |
| } | |
| return {t for t in tokens if t not in stop} | |
| def joined_similarity(a: list[str], b: list[str]) -> float: | |
| return difflib.SequenceMatcher( | |
| None, | |
| normalize_line(" ".join(a)), | |
| normalize_line(" ".join(b)), | |
| ).ratio() | |
| def strip_outer_blank_lines(lines: list[str]) -> list[str]: | |
| out = list(lines) | |
| while out and not out[0].strip(): | |
| out.pop(0) | |
| while out and not out[-1].strip(): | |
| out.pop() | |
| return out | |
| def locate_collapsed_stale_paragraph_atom( | |
| file_lines: list[str], | |
| hunk: Hunk, | |
| path: Path, | |
| warnings: list[str], | |
| ) -> tuple[MatchResult, list[str]] | None: | |
| """Replace a stale multi-line paragraph currently collapsed into fewer lines. | |
| Handles Markdown docs where an AI patch expects: | |
| - old line 1 | |
| - old line 2 | |
| - old line 3 | |
| - old line 4 | |
| but the target has the same semantic paragraph collapsed into one long line. | |
| Conservative limits: | |
| - Markdown only | |
| - removed + added paragraph hunk | |
| - explicit context is blank-only | |
| - small old/new paragraph | |
| - search bounded around hunk header old-start | |
| - unique best local window by joined text similarity | |
| """ | |
| if path.suffix.lower() not in MARKDOWN_EXTENSIONS: | |
| return None | |
| old_block, new_block, removed_only, added_only, context_only = old_new_blocks(hunk) | |
| if not removed_only or not added_only: | |
| return None | |
| # This is specifically for blank-context paragraph replacements. | |
| if any(line.strip() for line in context_only): | |
| return None | |
| if len(removed_only) < 2 or len(removed_only) > 8: | |
| return None | |
| if len(added_only) < 2 or len(added_only) > 10: | |
| return None | |
| old_start = parse_hunk_old_start(hunk.header) | |
| if old_start is None: | |
| return None | |
| bounds = hunk_change_bounds(hunk) | |
| if bounds is None: | |
| return None | |
| first_change, last_change = bounds | |
| replacement = strip_outer_blank_lines( | |
| hunk_new_region_lines( | |
| hunk=hunk, | |
| start=first_change, | |
| end=last_change, | |
| path=path, | |
| preferred_marker=None, | |
| ) | |
| ) | |
| if not replacement or len(replacement) > 14: | |
| return None | |
| search_start = max(0, old_start - 140) | |
| search_end = min(len(file_lines), old_start + 180) | |
| candidates: list[tuple[float, int, int, int]] = [] | |
| n = len(removed_only) | |
| for size in range(1, min(8, n + 2) + 1): | |
| for start in range(search_start, max(search_start, search_end - size + 1)): | |
| end = start + size | |
| window = file_lines[start:end] | |
| if not any(line.strip() for line in window): | |
| continue | |
| stripped_nonblank = [line.strip() for line in window if line.strip()] | |
| if any(line.startswith("#") for line in stripped_nonblank): | |
| continue | |
| if any(line.startswith("```") for line in stripped_nonblank): | |
| continue | |
| if any(line.startswith(("- ", "* ", "+ ", "1.", "2.", "3.", "4.", "5.")) for line in stripped_nonblank): | |
| continue | |
| sim = joined_similarity(window, removed_only) | |
| # Prefer compact/collapsed windows when similarity is equal. | |
| compact_bonus = 0.03 if size == 1 else 0.0 | |
| score = sim + compact_bonus | |
| if sim >= 0.62: | |
| candidates.append((score, start, end, size)) | |
| if not candidates: | |
| return None | |
| candidates.sort(key=lambda item: item[0], reverse=True) | |
| best_score, start, end, size = candidates[0] | |
| close = [c for c in candidates if best_score - c[0] < 0.025] | |
| # Require unique best, or uniquely nearest to hunk header. | |
| if len(close) > 1: | |
| ranked = sorted(close, key=lambda c: abs(c[1] - old_start)) | |
| best_distance = abs(ranked[0][1] - old_start) | |
| second_distance = abs(ranked[1][1] - old_start) | |
| if best_distance == second_distance: | |
| return None | |
| best_score, start, end, size = ranked[0] | |
| current = file_lines[start:end] | |
| if block_already_matches(current, replacement, path): | |
| return MatchResult( | |
| found=False, | |
| confidence=0.96, | |
| method="already_applied_collapsed_stale_paragraph_atom", | |
| warnings=warnings + [ | |
| f"Collapsed stale paragraph already matches; score={best_score:.2f}." | |
| ], | |
| missing_lines=[], | |
| candidate_count=len(candidates), | |
| ), replacement | |
| return MatchResult( | |
| True, | |
| start, | |
| end, | |
| 0.91, | |
| "collapsed_stale_paragraph_atom", | |
| warnings + [ | |
| f"Collapsed stale paragraph atom used; score={best_score:.2f}, span={size}, candidates={len(candidates)}." | |
| ], | |
| [], | |
| len(candidates), | |
| ), replacement | |
| def locate_stale_paragraph_by_header_window_atom( | |
| file_lines: list[str], | |
| hunk: Hunk, | |
| path: Path, | |
| warnings: list[str], | |
| ) -> tuple[MatchResult, list[str]] | None: | |
| """Replace a small stale paragraph near the hunk header line. | |
| This is for cases where: | |
| - the old paragraph text has drifted | |
| - explicit context anchors are absent or useless | |
| - the hunk header line number is still close enough | |
| - the old/new paragraph is small | |
| - the local target window shares strong domain terms with removed text | |
| It is intentionally Markdown-only and bounded by hunk locality. | |
| """ | |
| if path.suffix.lower() not in MARKDOWN_EXTENSIONS: | |
| return None | |
| old_block, new_block, removed_only, added_only, context_only = old_new_blocks(hunk) | |
| if not removed_only or not added_only: | |
| return None | |
| # Do not hijack hunks that already have explicit context. Let bracket, | |
| # side-anchored, normalized, and other safer strategies handle those. | |
| if context_only: | |
| return None | |
| if len(removed_only) < 2 or len(removed_only) > 8: | |
| return None | |
| if len(added_only) > 14 or len(new_block) > 18: | |
| return None | |
| old_start = parse_hunk_old_start(hunk.header) | |
| if old_start is None: | |
| return None | |
| old_tokens = text_token_set(removed_only) | |
| if len(old_tokens) < 4: | |
| return None | |
| bounds = hunk_change_bounds(hunk) | |
| if bounds is None: | |
| return None | |
| first_change, last_change = bounds | |
| replacement = hunk_new_region_lines( | |
| hunk=hunk, | |
| start=first_change, | |
| end=last_change, | |
| path=path, | |
| preferred_marker=None, | |
| ) | |
| if not replacement or len(replacement) > 18: | |
| return None | |
| n = len(removed_only) | |
| search_start = max(0, old_start - 90) | |
| search_end = min(len(file_lines), old_start + 140) | |
| candidates: list[tuple[float, int, int, float, float, int]] = [] | |
| for size in range(max(1, n - 2), min(10, n + 3) + 1): | |
| for start in range(search_start, max(search_start, search_end - size + 1)): | |
| end = start + size | |
| current = file_lines[start:end] | |
| if not current: | |
| continue | |
| # Avoid obviously structural windows. | |
| nonblank = [line for line in current if line.strip()] | |
| if not nonblank: | |
| continue | |
| if any(line.lstrip().startswith("# ") for line in nonblank): | |
| continue | |
| sim = joined_similarity(current, removed_only) | |
| current_tokens = text_token_set(current) | |
| shared = old_tokens & current_tokens | |
| overlap = len(shared) / max(1, min(len(old_tokens), len(current_tokens))) | |
| # Score balances phrase similarity, token overlap, and locality. | |
| distance = abs(start - old_start) | |
| locality = max(0.0, 1.0 - (distance / 140.0)) | |
| score = (sim * 0.50) + (overlap * 0.35) + (locality * 0.15) | |
| # Need enough evidence. Hunk 10 should have repeated domain terms, | |
| # but we still require either phrase similarity or strong overlap. | |
| if sim < 0.32 and not (overlap >= 0.52 and len(shared) >= 5): | |
| continue | |
| candidates.append((score, start, end, sim, overlap, len(shared))) | |
| if not candidates: | |
| return None | |
| candidates.sort(key=lambda item: item[0], reverse=True) | |
| best = candidates[0] | |
| close = [c for c in candidates if best[0] - c[0] < 0.04] | |
| # Require a unique best candidate. If tied, choose only if nearest to hunk | |
| # header is uniquely nearest. | |
| if len(close) > 1: | |
| ranked_by_distance = sorted(close, key=lambda c: abs(c[1] - old_start)) | |
| best_distance = abs(ranked_by_distance[0][1] - old_start) | |
| second_distance = abs(ranked_by_distance[1][1] - old_start) | |
| if best_distance == second_distance: | |
| return None | |
| best = ranked_by_distance[0] | |
| score, start, end, sim, overlap, shared_count = best | |
| current = file_lines[start:end] | |
| if block_already_matches(current, replacement, path): | |
| return MatchResult( | |
| found=False, | |
| confidence=0.96, | |
| method="already_applied_stale_paragraph_header_window_atom", | |
| warnings=warnings + [ | |
| f"Header-window stale paragraph already matches; sim={sim:.2f}, overlap={overlap:.2f}, shared_terms={shared_count}." | |
| ], | |
| missing_lines=[], | |
| candidate_count=len(candidates), | |
| ), replacement | |
| return MatchResult( | |
| True, | |
| start, | |
| end, | |
| 0.91, | |
| "stale_paragraph_header_window_atom", | |
| warnings + [ | |
| f"Header-window stale paragraph atom used; sim={sim:.2f}, overlap={overlap:.2f}, shared_terms={shared_count}, candidates={len(candidates)}." | |
| ], | |
| [], | |
| len(candidates), | |
| ), replacement | |
| def locate_stale_paragraph_after_anchor_atom( | |
| file_lines: list[str], | |
| hunk: Hunk, | |
| path: Path, | |
| warnings: list[str], | |
| ) -> tuple[MatchResult, list[str]] | None: | |
| """Replace a small stale Markdown paragraph after a unique anchor. | |
| This is for cases where the old paragraph text has drifted enough that the | |
| literal removed lines no longer exist, but the hunk still clearly describes | |
| a bounded paragraph replacement. | |
| Conservative limits: | |
| - Markdown only | |
| - removed + added paragraph-style hunk | |
| - small old/new regions | |
| - unique previous context or hunk-header context anchor | |
| - bounded local replacement span | |
| - meaningful joined-text or token overlap with the stale old text | |
| """ | |
| if path.suffix.lower() not in MARKDOWN_EXTENSIONS: | |
| return None | |
| old_block, new_block, removed_only, added_only, context_only = old_new_blocks(hunk) | |
| if not removed_only or not added_only: | |
| return None | |
| if len(removed_only) < 2 or len(removed_only) > 8: | |
| return None | |
| if len(added_only) > 12: | |
| return None | |
| bounds = hunk_change_bounds(hunk) | |
| if bounds is None: | |
| return None | |
| first_change, last_change = bounds | |
| prev_ctx = nearest_context_before(hunk, first_change) | |
| next_ctx = nearest_context_after(hunk, last_change) | |
| anchor_candidates: list[tuple[str, str]] = [] | |
| if prev_ctx and prev_ctx[1].strip(): | |
| anchor_candidates.append((prev_ctx[1], "prev_context")) | |
| header_text = hunk_header_context_text(hunk.header) | |
| if header_text: | |
| anchor_candidates.append((header_text, "hunk_header")) | |
| if not anchor_candidates: | |
| return None | |
| replacement = hunk_new_region_lines( | |
| hunk=hunk, | |
| start=first_change, | |
| end=last_change, | |
| path=path, | |
| preferred_marker=None, | |
| ) | |
| if not replacement or len(replacement) > 18: | |
| return None | |
| old_tokens = text_token_set(removed_only) | |
| if not old_tokens: | |
| return None | |
| for anchor_text, anchor_source in anchor_candidates: | |
| found_anchor = find_unique_context_anchor(file_lines, anchor_text, path) | |
| if not found_anchor and anchor_source == "hunk_header": | |
| fuzzy_anchor = find_unique_fuzzy_line( | |
| file_lines=file_lines, | |
| target_line=anchor_text, | |
| path=path, | |
| min_score=0.78, | |
| ) | |
| if fuzzy_anchor: | |
| found_anchor = (fuzzy_anchor[0], "hunk_header_fuzzy") | |
| if not found_anchor: | |
| continue | |
| anchor_i, anchor_method = found_anchor | |
| start = anchor_i + 1 | |
| while start < len(file_lines) and not file_lines[start].strip(): | |
| start += 1 | |
| if start >= len(file_lines): | |
| continue | |
| # Prefer explicit following context if it uniquely resolves after anchor. | |
| end: int | None = None | |
| next_method = "none" | |
| if next_ctx and next_ctx[1].strip(): | |
| found_next = find_unique_context_anchor(file_lines, next_ctx[1], path) | |
| if found_next: | |
| next_i, next_method = found_next | |
| if anchor_i < next_i and (next_i - start) <= 16: | |
| end = next_i | |
| # Otherwise replace same-sized stale paragraph region. | |
| if end is None: | |
| end = min(len(file_lines), start + len(removed_only)) | |
| if end <= start or (end - start) > 16: | |
| continue | |
| current = file_lines[start:end] | |
| if not current: | |
| continue | |
| sim = joined_similarity(current, removed_only) | |
| current_tokens = text_token_set(current) | |
| shared = old_tokens & current_tokens | |
| overlap = len(shared) / max(1, min(len(old_tokens), len(current_tokens))) | |
| # Bounded but not blind. | |
| # | |
| # Require some actual phrase similarity. Token overlap alone can be too | |
| # permissive for docs that repeat domain terms like Product, Offer, | |
| # Purchase Entry, Shop QR, etc. | |
| # | |
| # Exception: hunk-header anchored matches may pass with slightly lower | |
| # phrase similarity if token overlap is very strong. | |
| if sim < 0.34: | |
| if not ( | |
| anchor_source == "hunk_header" | |
| and sim >= 0.28 | |
| and overlap >= 0.58 | |
| and len(shared) >= 8 | |
| ): | |
| continue | |
| if block_already_matches(current, replacement, path): | |
| return MatchResult( | |
| found=False, | |
| confidence=0.96, | |
| method="already_applied_stale_paragraph_after_anchor_atom", | |
| warnings=warnings + [ | |
| f"Stale paragraph after anchor already matches ({anchor_source}/{anchor_method})." | |
| ], | |
| missing_lines=[], | |
| candidate_count=1, | |
| ), replacement | |
| return MatchResult( | |
| True, | |
| start, | |
| end, | |
| 0.91, | |
| f"stale_paragraph_after_anchor_atom_{anchor_source}_{anchor_method}_{next_method}", | |
| warnings + [ | |
| f"Stale paragraph atom used after unique anchor; sim={sim:.2f}, overlap={overlap:.2f}, shared_terms={len(shared)}." | |
| ], | |
| [], | |
| 1, | |
| ), replacement | |
| return None | |
| def locate_header_anchor_tail_region_atom( | |
| file_lines: list[str], | |
| hunk: Hunk, | |
| path: Path, | |
| warnings: list[str], | |
| ) -> tuple[MatchResult, list[str]] | None: | |
| """Replace a small tail region immediately after hunk-header context. | |
| This is for malformed/stale tail hunks where: | |
| - the hunk header has useful trailing context | |
| - explicit old lines have drifted and no longer match | |
| - the intended replacement is small | |
| - no following context anchor is available | |
| It is deliberately conservative: | |
| - Markdown/text files only | |
| - must have both removed and added lines | |
| - must have hunk-header context | |
| - header context must resolve uniquely | |
| - replacement span is small | |
| - starts at the first nonblank line after the header anchor | |
| """ | |
| if path.suffix.lower() not in MARKDOWN_EXTENSIONS: | |
| return None | |
| old_block, new_block, removed_only, added_only, context_only = old_new_blocks(hunk) | |
| if not removed_only or not added_only: | |
| return None | |
| # Header-tail replacement is only for true tail hunks where the hunk header | |
| # is the only useful anchor. If the hunk still has explicit context lines, | |
| # use the normal context-bracket / add-only / side-anchored strategies. | |
| # This prevents weak header-tail matches from hijacking normal mid-file edits. | |
| if context_only: | |
| return None | |
| if len(removed_only) > 8 or len(added_only) > 10: | |
| return None | |
| bounds = hunk_change_bounds(hunk) | |
| if bounds is None: | |
| return None | |
| first_change, last_change = bounds | |
| header_text = hunk_header_context_text(hunk.header) | |
| if not header_text: | |
| return None | |
| found = find_unique_context_anchor(file_lines, header_text, path) | |
| if not found: | |
| fuzzy_header = find_unique_fuzzy_line( | |
| file_lines=file_lines, | |
| target_line=header_text, | |
| path=path, | |
| min_score=0.78, | |
| ) | |
| if fuzzy_header: | |
| found = (fuzzy_header[0], "hunk_header_fuzzy") | |
| if not found: | |
| return None | |
| anchor_i, anchor_method = found | |
| # Build old/new region lengths from the hunk region itself, not only | |
| # removed/added lines, so internal blank/context lines are preserved. | |
| old_region = [ | |
| dl.text | |
| for dl in hunk.lines[first_change : last_change + 1] | |
| if dl.kind in {"context", "remove"} | |
| ] | |
| replacement = hunk_new_region_lines( | |
| hunk=hunk, | |
| start=first_change, | |
| end=last_change, | |
| path=path, | |
| preferred_marker=infer_local_bullet_marker( | |
| file_lines[max(0, anchor_i - 3) : min(len(file_lines), anchor_i + 10)], | |
| path, | |
| ), | |
| ) | |
| if not old_region or not replacement: | |
| return None | |
| if len(old_region) > 12 or len(replacement) > 16: | |
| return None | |
| # Tail starts after the header anchor. Preserve blank separation by starting | |
| # at first nonblank line, because the hunk region begins at the first actual | |
| # changed line. | |
| start = anchor_i + 1 | |
| while start < len(file_lines) and not file_lines[start].strip(): | |
| start += 1 | |
| if start >= len(file_lines): | |
| return None | |
| end = min(len(file_lines), start + len(old_region)) | |
| if end <= start or (end - start) > 12: | |
| return None | |
| # Guard: local text should at least weakly resemble the stale old region, | |
| # unless the first old line is totally stale. This prevents random tail cuts. | |
| current = file_lines[start:end] | |
| scores = [ | |
| markdown_or_normal_similarity(a, b, path) | |
| for a, b in zip(current, old_region) | |
| ] | |
| avg_score = sum(scores) / len(scores) if scores else 0.0 | |
| best_score = max(scores) if scores else 0.0 | |
| # Require meaningful resemblance to the stale old region. The old text may | |
| # drift, but if both average and best-line similarity are weak, the match is | |
| # too risky. | |
| if avg_score < 0.45 and best_score < 0.70: | |
| return None | |
| if block_already_matches(current, replacement, path): | |
| return MatchResult( | |
| found=False, | |
| confidence=0.96, | |
| method="already_applied_header_anchor_tail_region_atom", | |
| warnings=warnings + [ | |
| f"Header-anchor tail region already matches after unique header context ({anchor_method})." | |
| ], | |
| missing_lines=[], | |
| candidate_count=1, | |
| ), replacement | |
| return MatchResult( | |
| True, | |
| start, | |
| end, | |
| 0.91, | |
| f"header_anchor_tail_region_atom_{anchor_method}", | |
| warnings + [ | |
| f"Header-anchor tail region atom used; avg_score={avg_score:.2f}, best_score={best_score:.2f}." | |
| ], | |
| [], | |
| 1, | |
| ), replacement | |
| def locate_prev_anchor_tail_replacement_atom( | |
| file_lines: list[str], | |
| hunk: Hunk, | |
| path: Path, | |
| warnings: list[str], | |
| ) -> tuple[MatchResult, list[str]] | None: | |
| """Replace a bounded stale tail region after a unique previous context. | |
| Handles hunks like: | |
| context anchor | |
| -old line 1 | |
| -old line 2 | |
| +new line 1 | |
| +new line 2 | |
| with no following context line. | |
| This is deliberately limited: | |
| - must have a unique previous context anchor | |
| - must have removals and additions | |
| - must have no following context anchor | |
| - replacement/removal span must be small | |
| - removed lines must fuzzy-match a nearby local span | |
| """ | |
| old_block, new_block, removed_only, added_only, context_only = old_new_blocks(hunk) | |
| if not removed_only or not added_only: | |
| return None | |
| if len(removed_only) > 12 or len(added_only) > 16: | |
| return None | |
| bounds = hunk_change_bounds(hunk) | |
| if bounds is None: | |
| return None | |
| first_change, last_change = bounds | |
| prev_ctx = nearest_context_before(hunk, first_change) | |
| next_ctx = nearest_context_after(hunk, last_change) | |
| # This strategy is specifically for tail hunks without a following anchor. | |
| if next_ctx: | |
| return None | |
| prev_anchor_text: str | None = prev_ctx[1] if prev_ctx else None | |
| prev_anchor_source = "context" | |
| if not prev_anchor_text: | |
| prev_anchor_text = hunk_header_context_text(hunk.header) | |
| prev_anchor_source = "hunk_header" | |
| if not prev_anchor_text: | |
| return None | |
| found_prev = find_unique_context_anchor(file_lines, prev_anchor_text, path) | |
| if not found_prev: | |
| return None | |
| prev_line, prev_method = found_prev | |
| search_start = prev_line + 1 | |
| search_end = min(len(file_lines), prev_line + 40) | |
| if search_start >= search_end: | |
| return None | |
| first_found = find_best_fuzzy_line_in_range( | |
| file_lines=file_lines, | |
| target_line=removed_only[0], | |
| path=path, | |
| start=search_start, | |
| end=search_end, | |
| min_score=0.68, | |
| ) | |
| if first_found is None: | |
| # Hunk-header tail fallback: stale old lines may no longer fuzzy-match, | |
| # but the hunk header can still identify the local tail region. Only do | |
| # this for small tail hunks and only immediately after the anchor. | |
| if prev_anchor_source != "hunk_header" or len(removed_only) > 6 or len(added_only) > 8: | |
| return None | |
| start = search_start | |
| first_score = 0.70 | |
| else: | |
| start, first_score = first_found | |
| end = start + len(removed_only) | |
| if end > search_end or end > len(file_lines): | |
| return None | |
| candidate = file_lines[start:end] | |
| if len(candidate) != len(removed_only): | |
| return None | |
| scores = [ | |
| markdown_or_normal_similarity(file_line, old_line, path) | |
| for file_line, old_line in zip(candidate, removed_only) | |
| ] | |
| avg_score = sum(scores) / len(scores) | |
| if first_score < 0.68 or avg_score < 0.66: | |
| return None | |
| local_marker = infer_local_bullet_marker( | |
| file_lines[max(0, start - 4) : min(len(file_lines), end + 4)], | |
| path, | |
| ) | |
| replacement = hunk_new_region_lines( | |
| hunk=hunk, | |
| start=first_change, | |
| end=last_change, | |
| path=path, | |
| preferred_marker=local_marker, | |
| ) | |
| if len(replacement) > 24: | |
| return None | |
| return MatchResult( | |
| True, | |
| start, | |
| end, | |
| 0.91, | |
| f"prev_anchor_tail_replacement_atom_{prev_anchor_source}_{prev_method}", | |
| warnings + [ | |
| f"Previous-anchor tail replacement atom used from {prev_anchor_source}; first_score={first_score:.2f}, avg_score={avg_score:.2f}." | |
| ], | |
| [], | |
| 1, | |
| ), replacement | |
| def locate_side_anchored_single_line_atom( | |
| file_lines: list[str], | |
| hunk: Hunk, | |
| path: Path, | |
| warnings: list[str], | |
| ) -> tuple[MatchResult, list[str]] | None: | |
| """Replace one stale line near one or two unique context anchors. | |
| This is for AI patches where the old line has drifted, but the surrounding | |
| context still identifies the location safely. | |
| """ | |
| old_block, new_block, removed_only, added_only, context_only = old_new_blocks(hunk) | |
| if len(removed_only) != 1 or not added_only: | |
| return None | |
| bounds = hunk_change_bounds(hunk) | |
| if bounds is None: | |
| return None | |
| first_change, last_change = bounds | |
| prev_ctx = nearest_context_before(hunk, first_change) | |
| next_ctx = nearest_context_after(hunk, last_change) | |
| found_prev = find_unique_context_anchor(file_lines, prev_ctx[1], path) if prev_ctx else None | |
| found_next = find_unique_context_anchor(file_lines, next_ctx[1], path) if next_ctx else None | |
| search_start = 0 | |
| search_end = len(file_lines) | |
| anchor_desc = "" | |
| if found_prev and found_next: | |
| prev_line, prev_method = found_prev | |
| next_line, next_method = found_next | |
| if prev_line >= next_line: | |
| return None | |
| search_start = prev_line + 1 | |
| search_end = next_line | |
| anchor_desc = f"{prev_method}/{next_method}" | |
| elif found_prev: | |
| prev_line, prev_method = found_prev | |
| search_start = prev_line + 1 | |
| search_end = min(len(file_lines), prev_line + 8) | |
| anchor_desc = f"{prev_method}/none" | |
| elif found_next: | |
| next_line, next_method = found_next | |
| search_start = max(0, next_line - 8) | |
| search_end = next_line | |
| anchor_desc = f"none/{next_method}" | |
| else: | |
| return None | |
| if search_end <= search_start or (search_end - search_start) > 20: | |
| return None | |
| found = find_best_fuzzy_line_in_range( | |
| file_lines=file_lines, | |
| target_line=removed_only[0], | |
| path=path, | |
| start=search_start, | |
| end=search_end, | |
| min_score=0.74, | |
| ) | |
| # Extra fallback for numbered Markdown question lists. | |
| # | |
| # Handles tail/list patches like: | |
| # 6. existing anchor | |
| # -7. old question | |
| # +7. new question | |
| # 8. existing context | |
| # +9. new question | |
| # +10. new question | |
| # | |
| # The old implementation replaced only line 7 with added_only, which would | |
| # place 9/10 before 8. This version replaces the local numbered span and | |
| # preserves embedded context order. | |
| if found is None and found_prev and is_numbered_markdown_item(removed_only[0]): | |
| prev_line = found_prev[0] | |
| scan_limit = min(len(file_lines), prev_line + 20) | |
| # Find the first numbered item after the previous anchor. | |
| candidate: int | None = None | |
| for i in range(prev_line + 1, scan_limit): | |
| if not file_lines[i].strip(): | |
| continue | |
| if is_numbered_markdown_item(file_lines[i]): | |
| candidate = i | |
| break | |
| if candidate is not None: | |
| local_marker = infer_local_bullet_marker( | |
| file_lines[max(0, prev_line - 3) : min(len(file_lines), scan_limit)], | |
| path, | |
| ) | |
| replacement = hunk_new_region_lines( | |
| hunk=hunk, | |
| start=first_change, | |
| end=last_change, | |
| path=path, | |
| preferred_marker=local_marker, | |
| ) | |
| replace_end = candidate + 1 | |
| # If the replacement region contains existing context lines after | |
| # the removed item, include those target-file lines in the span. | |
| # This keeps order correct for: | |
| # replace 7, keep 8, append 9/10 | |
| search_from = candidate + 1 | |
| for dl in hunk.lines[first_change : last_change + 1]: | |
| if dl.kind != "context" or not dl.text.strip(): | |
| continue | |
| for j in range(search_from, scan_limit): | |
| if ( | |
| file_lines[j] == dl.text | |
| or normalize_line(file_lines[j]) == normalize_line(dl.text) | |
| or ( | |
| path.suffix.lower() in MARKDOWN_EXTENSIONS | |
| and "markdown_lines_equiv" in globals() | |
| and markdown_lines_equiv(file_lines[j], dl.text) | |
| ) | |
| ): | |
| replace_end = j + 1 | |
| search_from = j + 1 | |
| break | |
| if replace_end <= candidate: | |
| return None | |
| if (replace_end - candidate) > 20: | |
| warnings.append( | |
| f"Numbered-list replacement span too large ({replace_end - candidate} lines); skipped." | |
| ) | |
| return None | |
| return MatchResult( | |
| True, | |
| candidate, | |
| replace_end, | |
| 0.91, | |
| "numbered_list_span_after_prev_anchor_atom", | |
| warnings + ["Numbered-list span atom used after unique previous numbered anchor."], | |
| [], | |
| 1, | |
| ), replacement | |
| if found is None: | |
| return None | |
| line_index, score = found | |
| replacement = replacement_preserving_single_line_style( | |
| added_lines=added_only, | |
| matched_old_line=file_lines[line_index], | |
| path=path, | |
| ) | |
| return MatchResult( | |
| True, | |
| line_index, | |
| line_index + 1, | |
| max(0.90, min(0.93, score)), | |
| f"side_anchored_single_line_atom_{anchor_desc}", | |
| warnings + [f"Side-anchored stale line atom used; score={score:.2f}."], | |
| [], | |
| 1, | |
| ), replacement | |
| def locate_add_only_by_context_atom( | |
| file_lines: list[str], | |
| hunk: Hunk, | |
| path: Path, | |
| warnings: list[str], | |
| ) -> tuple[MatchResult, list[str]] | None: | |
| """Insert add-only hunks using nearby context anchors. | |
| Pair-first behavior: | |
| If single anchors are not unique, use a unique previous+next context pair. | |
| This handles dependency-list inserts where the same bullet appears elsewhere. | |
| """ | |
| old_block, new_block, removed_only, added_only, context_only = old_new_blocks(hunk) | |
| if removed_only or not added_only: | |
| return None | |
| add_indexes = [i for i, dl in enumerate(hunk.lines) if dl.kind == "add"] | |
| if not add_indexes: | |
| return None | |
| first_add = min(add_indexes) | |
| last_add = max(add_indexes) | |
| if len(added_only) > 80: | |
| return None | |
| def line_matches_context(file_line: str, ctx: str) -> tuple[bool, str]: | |
| if file_line == ctx: | |
| return True, "exact" | |
| if normalize_line(file_line) == normalize_line(ctx): | |
| return True, "normalized" | |
| if ( | |
| path.suffix.lower() in MARKDOWN_EXTENSIONS | |
| and "markdown_lines_equiv" in globals() | |
| and markdown_lines_equiv(file_line, ctx) | |
| ): | |
| return True, "markdown_equiv" | |
| return False, "" | |
| prev_contexts = [ | |
| dl.text | |
| for dl in reversed(hunk.lines[:first_add]) | |
| if dl.kind == "context" and dl.text.strip() | |
| ] | |
| next_contexts = [ | |
| dl.text | |
| for dl in hunk.lines[last_add + 1:] | |
| if dl.kind == "context" and dl.text.strip() | |
| ] | |
| # 1) Pair mode: previous + next context together must identify exactly one gap. | |
| # This is safer than using only one duplicate context line. | |
| for prev_ctx in prev_contexts[:8]: | |
| for next_ctx in next_contexts[:8]: | |
| matches: list[tuple[int, int, str, str]] = [] | |
| for i, file_line in enumerate(file_lines): | |
| prev_ok, prev_method = line_matches_context(file_line, prev_ctx) | |
| if not prev_ok: | |
| continue | |
| scan_end = min(len(file_lines), i + 35) | |
| for j in range(i + 1, scan_end): | |
| next_ok, next_method = line_matches_context(file_lines[j], next_ctx) | |
| if next_ok: | |
| matches.append((i, j, prev_method, next_method)) | |
| break | |
| if len(matches) == 1: | |
| prev_i, next_i, prev_method, next_method = matches[0] | |
| # If the added block already exists inside the bracket, call it done. | |
| inner = file_lines[prev_i + 1 : next_i] | |
| if find_exact_block(inner, added_only) or find_normalized_block(inner, added_only): | |
| return MatchResult( | |
| found=False, | |
| confidence=0.97, | |
| method="already_applied_add_only_between_context_pair", | |
| warnings=warnings + ["Added block already exists between unique context pair."], | |
| missing_lines=[], | |
| candidate_count=1, | |
| ), added_only | |
| # Idempotency fallback for malformed Markdown task/list hunks. | |
| # | |
| # After a successful apply, task-body lines may be reclassified | |
| # as context. The next-context anchor can then become one of the | |
| # newly-added body lines, making the bracket inner empty and | |
| # causing the task heading to look insertable again. | |
| # | |
| # Before inserting, check whether the added lines already exist | |
| # near the unique context pair. | |
| added_needles = [ | |
| line | |
| for line in sanitize_replacement_lines(added_only, path) | |
| if line.strip() | |
| ] | |
| if added_needles: | |
| near_start = max(0, prev_i - 5) | |
| near_end = min(len(file_lines), next_i + 60) | |
| all_added_near_pair = all( | |
| line_present_in_range( | |
| file_lines=file_lines, | |
| needle=needle, | |
| path=path, | |
| start=near_start, | |
| end=near_end, | |
| min_fuzzy_score=0.92, | |
| ) | |
| for needle in added_needles | |
| ) | |
| if all_added_near_pair: | |
| return MatchResult( | |
| found=False, | |
| confidence=0.96, | |
| method="already_applied_add_only_near_context_pair", | |
| warnings=warnings + [ | |
| "Added lines already exist near unique context pair; hunk appears already applied." | |
| ], | |
| missing_lines=[], | |
| candidate_count=1, | |
| ), added_only | |
| return MatchResult( | |
| True, | |
| prev_i + 1, | |
| prev_i + 1, | |
| 0.95, | |
| f"insert_between_context_pair_atom_{prev_method}_{next_method}", | |
| warnings + ["Add-only atom inserted between unique previous/following context pair."], | |
| [], | |
| 1, | |
| ), added_only | |
| if len(matches) > 1: | |
| warnings.append( | |
| f"Add-only context pair matched {len(matches)} places; trying other anchors." | |
| ) | |
| # 2) Existing behavior: nearest unique previous context. | |
| for ctx in prev_contexts[:8]: | |
| found = find_unique_context_anchor(file_lines, ctx, path) | |
| if found: | |
| idx, method = found | |
| return MatchResult( | |
| True, | |
| idx + 1, | |
| idx + 1, | |
| 0.94, | |
| f"insert_after_any_context_atom_{method}", | |
| warnings + ["Add-only atom inserted after unique nearby previous context."], | |
| [], | |
| 1, | |
| ), added_only | |
| # 3) Existing behavior: nearest unique following context. | |
| for ctx in next_contexts[:8]: | |
| found = find_unique_context_anchor(file_lines, ctx, path) | |
| if found: | |
| idx, method = found | |
| return MatchResult( | |
| True, | |
| idx, | |
| idx, | |
| 0.94, | |
| f"insert_before_any_context_atom_{method}", | |
| warnings + ["Add-only atom inserted before unique nearby following context."], | |
| [], | |
| 1, | |
| ), added_only | |
| return None | |
| def locate_edit_atom_fallback( | |
| file_lines: list[str], | |
| hunk: Hunk, | |
| path: Path, | |
| warnings: list[str], | |
| ) -> tuple[MatchResult, list[str]] | None: | |
| """Human-style fallback for obvious edit atoms. | |
| Handles: | |
| - already-applied added blocks | |
| - replacement clusters between two unique context anchors | |
| - one stale/near-match removed line replaced by added line(s) | |
| - add-only block inserted after/before a unique context anchor | |
| """ | |
| old_block, new_block, removed_only, added_only, context_only = old_new_blocks(hunk) | |
| if path.suffix.lower() in MARKDOWN_EXTENSIONS and "sanitize_replacement_lines" in globals(): | |
| new_block = sanitize_replacement_lines(new_block, path) | |
| added_only = sanitize_replacement_lines(added_only, path) | |
| # Already-applied atom check for add-only hunks. | |
| if added_only: | |
| added_matches = find_exact_block(file_lines, added_only) | |
| if len(added_matches) == 1: | |
| return MatchResult( | |
| found=False, | |
| confidence=0.97, | |
| method="already_applied_added_block_atom", | |
| warnings=warnings + ["Added atom already exists uniquely; hunk appears already applied."], | |
| missing_lines=[], | |
| candidate_count=1, | |
| ), new_block | |
| normalized_added_matches = find_normalized_block(file_lines, added_only) | |
| if len(normalized_added_matches) == 1: | |
| return MatchResult( | |
| found=False, | |
| confidence=0.95, | |
| method="already_applied_added_block_atom_normalized", | |
| warnings=warnings + ["Whitespace-normalized added atom already exists uniquely; hunk appears already applied."], | |
| missing_lines=[], | |
| candidate_count=1, | |
| ), new_block | |
| add_only = locate_add_only_by_context_atom(file_lines, hunk, path, warnings) | |
| if add_only is not None: | |
| return add_only | |
| # Case A: context-bracket replacement/insert cluster. | |
| bracket = locate_context_bracket_atom(file_lines, hunk, path, warnings) | |
| if bracket is not None: | |
| return bracket | |
| # Case B-4: stale multi-line paragraph currently collapsed into fewer target lines. | |
| collapsed_paragraph = locate_collapsed_stale_paragraph_atom(file_lines, hunk, path, warnings) | |
| if collapsed_paragraph is not None: | |
| return collapsed_paragraph | |
| # Case B-3: stale paragraph replacement near hunk-header locality. | |
| header_window_paragraph = locate_stale_paragraph_by_header_window_atom(file_lines, hunk, path, warnings) | |
| if header_window_paragraph is not None: | |
| return header_window_paragraph | |
| # Case B-2: stale paragraph replacement after unique context/header anchor. | |
| stale_paragraph = locate_stale_paragraph_after_anchor_atom(file_lines, hunk, path, warnings) | |
| if stale_paragraph is not None: | |
| return stale_paragraph | |
| # Case B-1: small stale tail replacement after hunk-header context. | |
| header_tail = locate_header_anchor_tail_region_atom(file_lines, hunk, path, warnings) | |
| if header_tail is not None: | |
| return header_tail | |
| # Case B0: bounded stale tail replacement after unique previous context. | |
| tail_replacement = locate_prev_anchor_tail_replacement_atom(file_lines, hunk, path, warnings) | |
| if tail_replacement is not None: | |
| return tail_replacement | |
| # Case B: one-line stale replacement near unique context anchor. | |
| side_anchored = locate_side_anchored_single_line_atom(file_lines, hunk, path, warnings) | |
| if side_anchored is not None: | |
| return side_anchored | |
| # Case C: one-line stale replacement. | |
| if len(removed_only) == 1 and added_only: | |
| found = find_unique_fuzzy_line(file_lines, removed_only[0], path, min_score=0.86) | |
| if found: | |
| line_index, score = found | |
| replacement = replacement_preserving_single_line_style( | |
| added_lines=added_only, | |
| matched_old_line=file_lines[line_index], | |
| path=path, | |
| ) | |
| return MatchResult( | |
| True, | |
| line_index, | |
| line_index + 1, | |
| max(0.90, min(0.94, score)), | |
| "fuzzy_single_line_atom", | |
| warnings + [f"Fuzzy single-line atom match used; score={score:.2f}."], | |
| [], | |
| 1, | |
| ), replacement | |
| # Case D: add-only block. Insert by nearest context line in the hunk. | |
| if not removed_only and added_only and context_only: | |
| first_add_index = next( | |
| (i for i, dl in enumerate(hunk.lines) if dl.kind == "add"), | |
| None, | |
| ) | |
| if first_add_index is not None: | |
| prev_context: str | None = None | |
| next_context: str | None = None | |
| for dl in reversed(hunk.lines[:first_add_index]): | |
| if dl.kind == "context" and dl.text.strip(): | |
| prev_context = dl.text | |
| break | |
| last_add_index = max(i for i, dl in enumerate(hunk.lines) if dl.kind == "add") | |
| for dl in hunk.lines[last_add_index + 1:]: | |
| if dl.kind == "context" and dl.text.strip(): | |
| next_context = dl.text | |
| break | |
| if prev_context: | |
| found_prev = find_unique_context_anchor(file_lines, prev_context, path) | |
| if found_prev: | |
| idx, method = found_prev | |
| return MatchResult( | |
| True, | |
| idx + 1, | |
| idx + 1, | |
| 0.93, | |
| f"insert_after_context_atom_{method}", | |
| warnings + ["Add-only atom inserted after unique previous context."], | |
| [], | |
| 1, | |
| ), added_only | |
| if next_context: | |
| found_next = find_unique_context_anchor(file_lines, next_context, path) | |
| if found_next: | |
| idx, method = found_next | |
| return MatchResult( | |
| True, | |
| idx, | |
| idx, | |
| 0.93, | |
| f"insert_before_context_atom_{method}", | |
| warnings + ["Add-only atom inserted before unique following context."], | |
| [], | |
| 1, | |
| ), added_only | |
| return None | |
| def find_ordered_line_subsequence_spans( | |
| file_lines: list[str], | |
| needles: list[str], | |
| max_span: int = 90, | |
| max_gap: int = 25, | |
| ) -> list[tuple[int, int]]: | |
| """Find compact ordered occurrences of normalized needle lines.""" | |
| norm_file = [normalize_line(line) for line in file_lines] | |
| norm_needles = [normalize_line(line) for line in needles if line.strip()] | |
| if not norm_needles: | |
| return [] | |
| spans: list[tuple[int, int]] = [] | |
| first = norm_needles[0] | |
| first_positions = [i for i, line in enumerate(norm_file) if line == first] | |
| for start in first_positions: | |
| pos = start | |
| ok = True | |
| for needle in norm_needles[1:]: | |
| found: int | None = None | |
| search_end = min(len(norm_file), pos + max_gap + 1) | |
| for j in range(pos + 1, search_end): | |
| if norm_file[j] == needle: | |
| found = j | |
| break | |
| if found is None: | |
| ok = False | |
| break | |
| pos = found | |
| if ok and (pos + 1 - start) <= max_span: | |
| spans.append((start, pos + 1)) | |
| # Dedupe while preserving order. | |
| return list(dict.fromkeys(spans)) | |
| def find_ordered_fuzzy_line_subsequence_spans( | |
| file_lines: list[str], | |
| needles: list[str], | |
| path: Path, | |
| max_span: int = 170, | |
| max_gap: int = 45, | |
| min_line_score: float = 0.78, | |
| min_avg_score: float = 0.86, | |
| ) -> list[tuple[int, int, float, float]]: | |
| """Find compact ordered fuzzy occurrences of needle lines. | |
| This is for already-applied/idempotency detection only, not for applying. | |
| It handles cases where the final Markdown lines exist in order, but small | |
| wording, sanitation, or context-repair drift prevents exact normalized | |
| subsequence matching. | |
| """ | |
| clean_needles = [line for line in needles if line.strip()] | |
| if not clean_needles: | |
| return [] | |
| candidates: list[tuple[int, int, float, float]] = [] | |
| # Find plausible first-line starts. | |
| first = clean_needles[0] | |
| first_positions: list[tuple[int, float]] = [] | |
| for i, line in enumerate(file_lines): | |
| score = markdown_or_normal_similarity(line, first, path) | |
| if score >= min_line_score: | |
| first_positions.append((i, score)) | |
| for start, first_score in first_positions: | |
| pos = start | |
| scores = [first_score] | |
| ok = True | |
| for needle in clean_needles[1:]: | |
| search_end = min(len(file_lines), pos + max_gap + 1) | |
| best: tuple[int, float] | None = None | |
| for j in range(pos + 1, search_end): | |
| score = markdown_or_normal_similarity(file_lines[j], needle, path) | |
| if score >= min_line_score and (best is None or score > best[1]): | |
| best = (j, score) | |
| if best is None: | |
| ok = False | |
| break | |
| pos, score = best | |
| scores.append(score) | |
| if not ok: | |
| continue | |
| span = pos + 1 - start | |
| if span > max_span: | |
| continue | |
| avg_score = sum(scores) / len(scores) | |
| min_score = min(scores) | |
| if avg_score >= min_avg_score and min_score >= min_line_score: | |
| candidates.append((start, pos + 1, avg_score, min_score)) | |
| candidates.sort(key=lambda item: (item[2], item[3]), reverse=True) | |
| return candidates | |
| def choose_unique_or_header_nearest_fuzzy_span( | |
| spans: list[tuple[int, int, float, float]], | |
| hunk: Hunk, | |
| max_distance: int = 190, | |
| ) -> tuple[int, int, float, float] | None: | |
| """Choose one fuzzy span, requiring uniqueness or header-local tie-break.""" | |
| if not spans: | |
| return None | |
| best = spans[0] | |
| close = [s for s in spans if best[2] - s[2] < 0.025] | |
| if len(close) == 1: | |
| return best | |
| old_start = parse_hunk_old_start(hunk.header) | |
| if old_start is None: | |
| return None | |
| ranked = sorted(close, key=lambda span: abs(span[0] - old_start)) | |
| best_distance = abs(ranked[0][0] - old_start) | |
| second_distance = abs(ranked[1][0] - old_start) | |
| if best_distance == second_distance: | |
| return None | |
| if best_distance > max_distance: | |
| return None | |
| return ranked[0] | |
| def line_present_in_range( | |
| file_lines: list[str], | |
| needle: str, | |
| path: Path, | |
| start: int, | |
| end: int, | |
| min_fuzzy_score: float = 0.88, | |
| ) -> bool: | |
| """Check whether one line exists in a bounded range.""" | |
| start = max(0, start) | |
| end = min(len(file_lines), end) | |
| needle_norm = normalize_line(needle) | |
| for line in file_lines[start:end]: | |
| if line == needle: | |
| return True | |
| if normalize_line(line) == needle_norm: | |
| return True | |
| if ( | |
| path.suffix.lower() in MARKDOWN_EXTENSIONS | |
| and "markdown_lines_equiv" in globals() | |
| and markdown_lines_equiv(line, needle) | |
| ): | |
| return True | |
| if markdown_or_normal_similarity(line, needle, path) >= min_fuzzy_score: | |
| return True | |
| return False | |
| def meaningful_new_block_lines_for_idempotency( | |
| lines: list[str], | |
| removed_only: list[str], | |
| path: Path, | |
| ) -> list[str]: | |
| """Extract useful final-state evidence from a hunk new_block. | |
| This is for malformed Markdown hunks where parser repair reclassifies final | |
| intended lines as context, so added_only is incomplete. | |
| Excludes: | |
| - blank lines | |
| - fences | |
| - old removed lines | |
| - generic headings/labels | |
| """ | |
| removed_norm = {normalize_line(line) for line in removed_only if line.strip()} | |
| out: list[str] = [] | |
| for line in sanitize_replacement_lines(lines, path): | |
| stripped = line.strip() | |
| norm = normalize_line(line) | |
| if not stripped: | |
| continue | |
| if stripped.startswith("```"): | |
| continue | |
| if norm in removed_norm: | |
| continue | |
| if stripped in {"Canonical flow:", "Recommended model:", "Operationally, this means:"}: | |
| continue | |
| # Keep arrow-flow lines, bullets, and meaningful prose. | |
| if ( | |
| stripped.startswith("->") | |
| or stripped.startswith(("- ", "* ", "+ ")) | |
| or len(stripped) >= 35 | |
| ): | |
| out.append(line) | |
| # Dedupe while preserving order. | |
| seen: set[str] = set() | |
| deduped: list[str] = [] | |
| for line in out: | |
| key = normalize_line(line) | |
| if key not in seen: | |
| seen.add(key) | |
| deduped.append(line) | |
| return deduped | |
| def locate_already_applied_new_block_lines_present( | |
| file_lines: list[str], | |
| hunk: Hunk, | |
| path: Path, | |
| removed_only: list[str], | |
| new_block: list[str], | |
| warnings: list[str], | |
| markdown_recovery: bool, | |
| ) -> MatchResult | None: | |
| """Detect already-applied malformed Markdown replacements by new-block evidence. | |
| This is idempotency-only. | |
| It catches hunks where: | |
| - old removed lines are gone | |
| - final intended lines exist near the hunk location | |
| - added_only is incomplete because repair reclassified arrow-flow/code-fence | |
| lines as context | |
| """ | |
| if path.suffix.lower() not in MARKDOWN_EXTENSIONS: | |
| return None | |
| if not removed_only or not new_block: | |
| return None | |
| missing_removed = line_presence_missing( | |
| file_lines=file_lines, | |
| expected_lines=removed_only, | |
| path=path, | |
| markdown_recovery=markdown_recovery, | |
| ) | |
| meaningful_removed = [line for line in removed_only if line.strip()] | |
| # Require strong evidence that the old side is gone. | |
| if len(missing_removed) < max(2, min(3, len(meaningful_removed))): | |
| return None | |
| needles = meaningful_new_block_lines_for_idempotency( | |
| lines=new_block, | |
| removed_only=removed_only, | |
| path=path, | |
| ) | |
| # For messy split hunks, two strong final lines can be enough if several old | |
| # lines are missing. One line is too weak. | |
| if len(needles) < 2: | |
| return None | |
| if len(needles) > 40: | |
| return None | |
| old_start = parse_hunk_old_start(hunk.header) | |
| if old_start is None: | |
| return None | |
| search_start = max(0, old_start - 100) | |
| search_end = min(len(file_lines), old_start + 220) | |
| missing_needles: list[str] = [] | |
| for needle in needles: | |
| if not line_present_in_range( | |
| file_lines=file_lines, | |
| needle=needle, | |
| path=path, | |
| start=search_start, | |
| end=search_end, | |
| min_fuzzy_score=0.88, | |
| ): | |
| missing_needles.append(needle) | |
| # Allow one miss if we still have many good needles. This handles a line | |
| # that was sanitized or wrapped differently after apply. | |
| allowed_missing = 0 if len(needles) < 5 else 1 | |
| if len(missing_needles) > allowed_missing: | |
| return None | |
| return MatchResult( | |
| found=False, | |
| confidence=0.95, | |
| method="already_applied_new_block_lines_present", | |
| warnings=warnings + [ | |
| f"Final new-block evidence already present near hunk location; old removed lines missing={len(missing_removed)}, evidence_lines={len(needles)}, missing_evidence={len(missing_needles)}." | |
| ], | |
| missing_lines=[], | |
| candidate_count=len(needles), | |
| ) | |
| def locate_already_applied_added_lines_present( | |
| file_lines: list[str], | |
| hunk: Hunk, | |
| path: Path, | |
| removed_only: list[str], | |
| added_only: list[str], | |
| warnings: list[str], | |
| markdown_recovery: bool, | |
| ) -> MatchResult | None: | |
| """Detect already-applied malformed Markdown replacement by added-line presence. | |
| This is intentionally an idempotency-only fallback for cases where: | |
| - old removed lines are gone | |
| - added lines are present near the hunk's original location | |
| - full new_block matching fails because parser repair reclassified | |
| Markdown code-fence / arrow-flow lines as context | |
| Conservative limits: | |
| - Markdown only | |
| - replacement hunks only | |
| - require at least 3 added nonblank lines | |
| - require all meaningful added lines present in a bounded local window | |
| - require at least one removed line missing, preferably several | |
| """ | |
| if path.suffix.lower() not in MARKDOWN_EXTENSIONS: | |
| return None | |
| if not removed_only or not added_only: | |
| return None | |
| added_needles = [ | |
| line | |
| for line in sanitize_replacement_lines(added_only, path) | |
| if line.strip() | |
| ] | |
| if len(added_needles) < 3: | |
| return None | |
| if len(added_needles) > 30: | |
| return None | |
| missing_removed = line_presence_missing( | |
| file_lines=file_lines, | |
| expected_lines=removed_only, | |
| path=path, | |
| markdown_recovery=markdown_recovery, | |
| ) | |
| if not missing_removed: | |
| return None | |
| # Require meaningful removal drift, not just one changed whitespace line. | |
| if len(missing_removed) < max(1, min(3, len([l for l in removed_only if l.strip()]))): | |
| return None | |
| old_start = parse_hunk_old_start(hunk.header) | |
| if old_start is None: | |
| return None | |
| search_start = max(0, old_start - 80) | |
| search_end = min(len(file_lines), old_start + 180) | |
| missing_added: list[str] = [] | |
| for needle in added_needles: | |
| if not line_present_in_range( | |
| file_lines=file_lines, | |
| needle=needle, | |
| path=path, | |
| start=search_start, | |
| end=search_end, | |
| min_fuzzy_score=0.90, | |
| ): | |
| missing_added.append(needle) | |
| if missing_added: | |
| return None | |
| return MatchResult( | |
| found=False, | |
| confidence=0.95, | |
| method="already_applied_added_lines_present", | |
| warnings=warnings + [ | |
| f"Added lines already present near hunk location; old removed lines missing={len(missing_removed)}." | |
| ], | |
| missing_lines=[], | |
| candidate_count=len(added_needles), | |
| ) | |
| def locate_already_applied_added_subsequence( | |
| file_lines: list[str], | |
| hunk: Hunk, | |
| path: Path, | |
| removed_only: list[str], | |
| added_only: list[str], | |
| warnings: list[str], | |
| markdown_recovery: bool, | |
| ) -> MatchResult | None: | |
| """Detect already-applied replacement hunks by their added lines. | |
| This handles post-apply idempotency when: | |
| - the old lines are gone | |
| - the added lines are present in order | |
| - exact new_block matching fails due to context/sanitation drift | |
| Conservative limits: | |
| - Markdown only | |
| - replacement hunks only | |
| - require several added lines | |
| - require at least one removed line to be missing | |
| - require one compact ordered added-line span, or a uniquely nearest span | |
| to the original hunk header line | |
| """ | |
| if path.suffix.lower() not in MARKDOWN_EXTENSIONS: | |
| return None | |
| if not removed_only or not added_only: | |
| return None | |
| # Ignore blank-only and very tiny additions. | |
| needles = [line for line in added_only if line.strip()] | |
| if len(needles) < 3: | |
| return None | |
| if len(needles) > 40: | |
| return None | |
| missing_removed = line_presence_missing( | |
| file_lines=file_lines, | |
| expected_lines=removed_only, | |
| path=path, | |
| markdown_recovery=markdown_recovery, | |
| ) | |
| if not missing_removed: | |
| return None | |
| spans = find_ordered_line_subsequence_spans( | |
| file_lines=file_lines, | |
| needles=needles, | |
| max_span=110, | |
| max_gap=30, | |
| ) | |
| if not spans: | |
| return None | |
| chosen: tuple[int, int] | None = None | |
| if len(spans) == 1: | |
| chosen = spans[0] | |
| else: | |
| old_start = parse_hunk_old_start(hunk.header) | |
| if old_start is None: | |
| return None | |
| ranked = sorted(spans, key=lambda span: abs(span[0] - old_start)) | |
| best_distance = abs(ranked[0][0] - old_start) | |
| second_distance = abs(ranked[1][0] - old_start) | |
| if best_distance == second_distance: | |
| return None | |
| # Must still be local-ish to the original hunk. | |
| if best_distance > 160: | |
| return None | |
| chosen = ranked[0] | |
| start, end = chosen | |
| return MatchResult( | |
| found=False, | |
| confidence=0.95, | |
| method="already_applied_added_subsequence", | |
| warnings=warnings + [ | |
| f"Added lines already exist as a compact ordered subsequence; old removed lines missing={len(missing_removed)}." | |
| ], | |
| missing_lines=[], | |
| candidate_count=len(spans), | |
| ) | |
| def locate_already_applied_new_subsequence( | |
| file_lines: list[str], | |
| hunk: Hunk, | |
| path: Path, | |
| removed_only: list[str], | |
| new_block: list[str], | |
| warnings: list[str], | |
| markdown_recovery: bool, | |
| ) -> MatchResult | None: | |
| """Detect already-applied replacement hunks by final new-block shape. | |
| This handles post-apply idempotency when: | |
| - removed lines are gone | |
| - arrow-flow/list repairs reclassified many final lines as context | |
| - added_only alone is too small or incomplete | |
| - the final hunk shape exists in the target file as an ordered compact | |
| subsequence, even if exact new_block matching failed due to blank/context | |
| drift. | |
| Conservative limits: | |
| - Markdown only | |
| - replacement hunks only | |
| - require missing removed lines | |
| - require several final nonblank lines | |
| - require compact ordered span | |
| """ | |
| if path.suffix.lower() not in MARKDOWN_EXTENSIONS: | |
| return None | |
| if not removed_only or not new_block: | |
| return None | |
| missing_removed = line_presence_missing( | |
| file_lines=file_lines, | |
| expected_lines=removed_only, | |
| path=path, | |
| markdown_recovery=markdown_recovery, | |
| ) | |
| if not missing_removed: | |
| return None | |
| needles = [line for line in sanitize_replacement_lines(new_block, path) if line.strip()] | |
| # Ignore tiny blocks; those are too easy to match accidentally. | |
| if len(needles) < 4: | |
| return None | |
| if len(needles) > 80: | |
| return None | |
| spans = find_ordered_line_subsequence_spans( | |
| file_lines=file_lines, | |
| needles=needles, | |
| max_span=160, | |
| max_gap=35, | |
| ) | |
| if spans: | |
| chosen: tuple[int, int] | None = None | |
| if len(spans) == 1: | |
| chosen = spans[0] | |
| else: | |
| old_start = parse_hunk_old_start(hunk.header) | |
| if old_start is None: | |
| chosen = None | |
| else: | |
| ranked = sorted(spans, key=lambda span: abs(span[0] - old_start)) | |
| best_distance = abs(ranked[0][0] - old_start) | |
| second_distance = abs(ranked[1][0] - old_start) | |
| if best_distance != second_distance and best_distance <= 180: | |
| chosen = ranked[0] | |
| if chosen is not None: | |
| start, end = chosen | |
| return MatchResult( | |
| found=False, | |
| confidence=0.96, | |
| method="already_applied_new_subsequence", | |
| warnings=warnings + [ | |
| f"Final new-block lines already exist as compact ordered subsequence; old removed lines missing={len(missing_removed)}." | |
| ], | |
| missing_lines=[], | |
| candidate_count=len(spans), | |
| ) | |
| fuzzy_spans = find_ordered_fuzzy_line_subsequence_spans( | |
| file_lines=file_lines, | |
| needles=needles, | |
| path=path, | |
| max_span=190, | |
| max_gap=50, | |
| min_line_score=0.76, | |
| min_avg_score=0.84, | |
| ) | |
| chosen_fuzzy = choose_unique_or_header_nearest_fuzzy_span( | |
| fuzzy_spans, | |
| hunk=hunk, | |
| max_distance=210, | |
| ) | |
| if chosen_fuzzy is None: | |
| return None | |
| start, end, avg_score, min_score = chosen_fuzzy | |
| return MatchResult( | |
| found=False, | |
| confidence=0.94, | |
| method="already_applied_new_fuzzy_subsequence", | |
| warnings=warnings + [ | |
| f"Final new-block lines already exist as compact fuzzy ordered subsequence; avg_score={avg_score:.2f}, min_score={min_score:.2f}, old removed lines missing={len(missing_removed)}." | |
| ], | |
| missing_lines=[], | |
| candidate_count=len(fuzzy_spans), | |
| ) | |
| def looks_like_markdown_task_heading(line: str) -> bool: | |
| """Markdown task heading/list item, e.g. `- [ ] Foo` or `* [x] Foo`.""" | |
| return bool(re.match(r"^\s*[-*+]\s+\[[ xX]\]\s+", line.strip())) | |
| def target_contains_markdown_equiv_line(file_lines: list[str], line: str, path: Path) -> bool: | |
| """Whether a line already exists in target under exact/normalized/Markdown-equivalent rules.""" | |
| norm = normalize_line(line) | |
| for file_line in file_lines: | |
| if file_line == line: | |
| return True | |
| if normalize_line(file_line) == norm: | |
| return True | |
| if ( | |
| path.suffix.lower() in MARKDOWN_EXTENSIONS | |
| and "markdown_lines_equiv" in globals() | |
| and markdown_lines_equiv(file_line, line) | |
| ): | |
| return True | |
| return False | |
| def normalize_task_body_addition(line: str) -> str: | |
| """Normalize malformed task-body bullets into nested ` * ...` style.""" | |
| if looks_like_markdown_task_heading(line): | |
| return line | |
| m = markdown_bullet_match(line) | |
| if not m: | |
| return line | |
| body = m.group("body").rstrip() | |
| return f" * {body}" | |
| def repair_malformed_added_task_body_removals( | |
| hunk: Hunk, | |
| file_lines: list[str], | |
| path: Path, | |
| ) -> tuple[Hunk, list[str]]: | |
| """Reclassify malformed task-body removals as additions.""" | |
| if path.suffix.lower() not in MARKDOWN_EXTENSIONS: | |
| return hunk, [] | |
| has_added_task = any( | |
| dl.kind == "add" and looks_like_markdown_task_heading(dl.text) | |
| for dl in hunk.lines | |
| ) | |
| if not has_added_task: | |
| return hunk, [] | |
| changed = False | |
| warnings: list[str] = [] | |
| repaired: list[DiffLine] = [] | |
| seen_added_task = False | |
| converted = 0 | |
| for dl in hunk.lines: | |
| if dl.kind == "add" and looks_like_markdown_task_heading(dl.text): | |
| seen_added_task = True | |
| repaired.append(dl) | |
| continue | |
| if ( | |
| seen_added_task | |
| and dl.kind == "remove" | |
| and looks_like_markdown_bullet(dl.text) | |
| and not target_contains_markdown_equiv_line(file_lines, dl.text, path) | |
| ): | |
| new_text = normalize_task_body_addition(dl.text) | |
| repaired.append(DiffLine("add", new_text, "+" + new_text)) | |
| changed = True | |
| converted += 1 | |
| warnings.append( | |
| f"Malformed added task-body removal reclassified as addition: {dl.text}" | |
| ) | |
| continue | |
| repaired.append(dl) | |
| if not changed or converted < 2: | |
| return hunk, [] | |
| return Hunk( | |
| header=hunk.header, | |
| lines=repaired, | |
| parse_warnings=list(hunk.parse_warnings), | |
| ), warnings | |
| def repair_markdown_arrow_flow_context_removals( | |
| hunk: Hunk, | |
| file_lines: list[str], | |
| path: Path, | |
| ) -> tuple[Hunk, list[str]]: | |
| """Repair parsed Markdown arrow-flow lines misread as removals. | |
| Malformed AI diffs may emit real Markdown context lines like: | |
| -> evaluate Products/Offers | |
| -> buy | |
| at column 1. Unified-diff parsing reads those as remove marker "-" | |
| plus text "> evaluate...". | |
| If the target Markdown file uniquely contains the literal arrow-flow line, | |
| treat it as context. This keeps add-only hunks from falling through to | |
| generic fuzzy_window matching. | |
| """ | |
| if path.suffix.lower() not in MARKDOWN_EXTENSIONS: | |
| return hunk, [] | |
| if not any(dl.kind == "add" for dl in hunk.lines): | |
| return hunk, [] | |
| changed = False | |
| warnings: list[str] = [] | |
| repaired_lines: list[DiffLine] = [] | |
| for dl in hunk.lines: | |
| if dl.kind == "remove" and dl.text.startswith(">"): | |
| candidates = [f"-{dl.text}", f"+{dl.text}"] | |
| matches: list[str] = [] | |
| for candidate in candidates: | |
| exact = [line for line in file_lines if line == candidate] | |
| if exact: | |
| matches.extend(exact) | |
| continue | |
| normalized = [ | |
| line for line in file_lines | |
| if normalize_line(line) == normalize_line(candidate) | |
| ] | |
| matches.extend(normalized) | |
| # Unique while preserving order. | |
| unique_matches = list(dict.fromkeys(matches)) | |
| if len(unique_matches) == 1: | |
| target_line = unique_matches[0] | |
| repaired_lines.append(DiffLine("context", target_line, " " + target_line)) | |
| changed = True | |
| warnings.append( | |
| f"Parsed Markdown arrow-flow removal reclassified as context: {target_line}" | |
| ) | |
| continue | |
| repaired_lines.append(dl) | |
| if not changed: | |
| return hunk, [] | |
| return Hunk( | |
| header=hunk.header, | |
| lines=repaired_lines, | |
| parse_warnings=list(hunk.parse_warnings), | |
| ), warnings | |
| def locate_hunk( | |
| file_lines: list[str], | |
| hunk: Hunk, | |
| path: Path, | |
| min_confidence: float, | |
| markdown_recovery: bool, | |
| ) -> tuple[MatchResult, list[str]]: | |
| warnings = list(hunk.parse_warnings) | |
| hunk, arrow_flow_repair_warnings = repair_markdown_arrow_flow_context_removals( | |
| hunk=hunk, | |
| file_lines=file_lines, | |
| path=path, | |
| ) | |
| warnings.extend(arrow_flow_repair_warnings) | |
| hunk, parsed_repair_warnings = repair_parsed_markdown_context_removals( | |
| hunk=hunk, | |
| file_lines=file_lines, | |
| path=path, | |
| ) | |
| warnings.extend(parsed_repair_warnings) | |
| hunk, task_body_repair_warnings = repair_malformed_added_task_body_removals( | |
| hunk=hunk, | |
| file_lines=file_lines, | |
| path=path, | |
| ) | |
| warnings.extend(task_body_repair_warnings) | |
| old_block, new_block, removed_only, added_only, context_only = old_new_blocks(hunk) | |
| # Match against the same Markdown sanitation that apply_patch_file writes. | |
| # Otherwise a hunk can apply cleanly, but fail idempotency afterward because | |
| # the file contains sanitized output while new_block still contains AI | |
| # artifacts/trailing whitespace. | |
| if path.suffix.lower() in MARKDOWN_EXTENSIONS and "sanitize_replacement_lines" in globals(): | |
| new_block = sanitize_replacement_lines(new_block, path) | |
| added_only = sanitize_replacement_lines(added_only, path) | |
| # AI diffs sometimes contain a hunk header plus context, but no actual | |
| # additions/removals. That is a malformed no-op hunk. Do not let it make the | |
| # patch unsafe. | |
| if not removed_only and not added_only: | |
| if not old_block: | |
| return MatchResult( | |
| found=False, | |
| confidence=1.0, | |
| method="noop_empty_hunk", | |
| warnings=warnings + ["Empty hunk has no changes; treated as no-op."], | |
| missing_lines=[], | |
| ), new_block | |
| exact = find_exact_block(file_lines, old_block) | |
| if len(exact) >= 1: | |
| return MatchResult( | |
| found=False, | |
| confidence=1.0, | |
| method="noop_context_only_hunk", | |
| warnings=warnings + ["Context-only hunk has no changes; treated as no-op."], | |
| missing_lines=[], | |
| candidate_count=len(exact), | |
| ), new_block | |
| normalized = find_normalized_block(file_lines, old_block) | |
| if len(normalized) >= 1: | |
| return MatchResult( | |
| found=False, | |
| confidence=0.98, | |
| method="noop_context_only_hunk_normalized", | |
| warnings=warnings + ["Whitespace-normalized context-only hunk has no changes; treated as no-op."], | |
| missing_lines=[], | |
| candidate_count=len(normalized), | |
| ), new_block | |
| if markdown_recovery and path.suffix.lower() in MARKDOWN_EXTENSIONS and "find_markdown_equiv_block" in globals(): | |
| md = find_markdown_equiv_block(file_lines, old_block) | |
| if len(md) >= 1: | |
| return MatchResult( | |
| found=False, | |
| confidence=0.96, | |
| method="noop_context_only_hunk_markdown_equiv", | |
| warnings=warnings + ["Markdown-equivalent context-only hunk has no changes; treated as no-op."], | |
| missing_lines=[], | |
| candidate_count=len(md), | |
| ), new_block | |
| # Even if context drifted, there is still no requested edit. Keep this | |
| # non-dangerous and do not block the whole patch. | |
| return MatchResult( | |
| found=False, | |
| confidence=0.90, | |
| method="noop_context_only_hunk_unmatched", | |
| warnings=warnings + ["Context-only hunk has no changes but context was not found; treated as no-op."], | |
| missing_lines=[], | |
| ), new_block | |
| if any(is_placeholder(line) for line in old_block + new_block): | |
| return MatchResult( | |
| found=False, | |
| confidence=0.0, | |
| method="placeholder_detected", | |
| warnings=warnings + ["Placeholder line like '...' detected; hunk skipped for safety."], | |
| missing_lines=[], | |
| ), new_block | |
| if not old_block and added_only: | |
| # No old block means probably an add-file hunk with only additions. | |
| # If file is empty/new, insert at top. Otherwise skip in v1 unless context exists. | |
| if not file_lines: | |
| return MatchResult(found=True, start=0, end=0, confidence=0.99, method="add_to_empty_file", warnings=warnings), new_block | |
| return MatchResult( | |
| found=False, | |
| confidence=0.0, | |
| method="insert_without_anchor", | |
| warnings=warnings + ["Insertion has no old/context anchor; skipped for safety."], | |
| missing_lines=[], | |
| ), new_block | |
| # Strategy 0: already-applied detection. | |
| # Important for insert-only hunks: after applying once, some context-only old blocks can | |
| # still remain unique and would otherwise look applyable again. | |
| new_matches = find_exact_block(file_lines, new_block) | |
| if len(new_matches) >= 1: | |
| return MatchResult( | |
| found=False, | |
| confidence=1.0, | |
| method="already_applied", | |
| warnings=warnings + ["New block already exists; hunk appears already applied."], | |
| missing_lines=[], | |
| candidate_count=len(new_matches), | |
| ), new_block | |
| normalized_new_matches = find_normalized_block(file_lines, new_block) | |
| if len(normalized_new_matches) >= 1: | |
| return MatchResult( | |
| found=False, | |
| confidence=0.98, | |
| method="already_applied_normalized", | |
| warnings=warnings + ["Whitespace-normalized new block already exists; hunk appears already applied."], | |
| missing_lines=[], | |
| candidate_count=len(normalized_new_matches), | |
| ), new_block | |
| new_subsequence = locate_already_applied_new_subsequence( | |
| file_lines=file_lines, | |
| hunk=hunk, | |
| path=path, | |
| removed_only=removed_only, | |
| new_block=new_block, | |
| warnings=warnings, | |
| markdown_recovery=markdown_recovery, | |
| ) | |
| if new_subsequence is not None: | |
| return new_subsequence, new_block | |
| new_block_present = locate_already_applied_new_block_lines_present( | |
| file_lines=file_lines, | |
| hunk=hunk, | |
| path=path, | |
| removed_only=removed_only, | |
| new_block=new_block, | |
| warnings=warnings, | |
| markdown_recovery=markdown_recovery, | |
| ) | |
| if new_block_present is not None: | |
| return new_block_present, new_block | |
| # Strategy 0b: already-applied added block. | |
| # Useful when an insertion is already present, but surrounding context drifted. | |
| if added_only: | |
| added_matches = find_exact_block(file_lines, added_only) | |
| if len(added_matches) == 1: | |
| return MatchResult( | |
| found=False, | |
| confidence=0.97, | |
| method="already_applied_added_block", | |
| warnings=warnings + ["Added block already exists uniquely; hunk appears already applied."], | |
| missing_lines=[], | |
| candidate_count=1, | |
| ), new_block | |
| normalized_added_matches = find_normalized_block(file_lines, added_only) | |
| if len(normalized_added_matches) == 1: | |
| return MatchResult( | |
| found=False, | |
| confidence=0.95, | |
| method="already_applied_added_block_normalized", | |
| warnings=warnings + ["Whitespace-normalized added block already exists uniquely; hunk appears already applied."], | |
| missing_lines=[], | |
| candidate_count=1, | |
| ), new_block | |
| if markdown_recovery and path.suffix.lower() in MARKDOWN_EXTENSIONS: | |
| markdown_added_matches = find_markdown_equiv_block(file_lines, added_only) | |
| if len(markdown_added_matches) == 1: | |
| return MatchResult( | |
| found=False, | |
| confidence=0.94, | |
| method="already_applied_added_block_markdown_equiv", | |
| warnings=warnings + ["Markdown-equivalent added block already exists uniquely; hunk appears already applied."], | |
| missing_lines=[], | |
| candidate_count=1, | |
| ), new_block | |
| added_present = locate_already_applied_added_lines_present( | |
| file_lines=file_lines, | |
| hunk=hunk, | |
| path=path, | |
| removed_only=removed_only, | |
| added_only=added_only, | |
| warnings=warnings, | |
| markdown_recovery=markdown_recovery, | |
| ) | |
| if added_present is not None: | |
| return added_present, new_block | |
| added_subsequence = locate_already_applied_added_subsequence( | |
| file_lines=file_lines, | |
| hunk=hunk, | |
| path=path, | |
| removed_only=removed_only, | |
| added_only=added_only, | |
| warnings=warnings, | |
| markdown_recovery=markdown_recovery, | |
| ) | |
| if added_subsequence is not None: | |
| return added_subsequence, new_block | |
| # Strategy 0c: human-style edit atom fallback. | |
| # | |
| # Run this early. AI diffs often contain several small human-obvious edits | |
| # in one malformed hunk. If we wait until after strict block strategies, | |
| # stale context or duplicated fuzzy windows can poison the hunk. | |
| atom = locate_edit_atom_fallback(file_lines, hunk, path, warnings) | |
| if atom is not None: | |
| return atom | |
| # Strategy 1: exact old block. | |
| matches = find_exact_block(file_lines, old_block) | |
| if len(matches) == 1: | |
| start, end = matches[0] | |
| return MatchResult(True, start, end, 0.98, "exact_old_block", warnings, [], 1), new_block | |
| if len(matches) > 1: | |
| return MatchResult( | |
| False, | |
| confidence=0.70, | |
| method="ambiguous_exact_old_block", | |
| warnings=warnings + [f"Exact old block matched {len(matches)} places; skipped for safety."], | |
| candidate_count=len(matches), | |
| ), new_block | |
| # Strategy 2: exact removed-only block, useful when context has drifted. | |
| if removed_only: | |
| matches = find_exact_block(file_lines, removed_only) | |
| if len(matches) == 1: | |
| start, end = matches[0] | |
| local_warnings = warnings[:] | |
| if context_only: | |
| local_warnings.append("Matched removed lines without full context; review recommended.") | |
| return MatchResult(True, start, end, 0.94, "exact_removed_block", local_warnings, [], 1), added_only | |
| if len(matches) > 1: | |
| warnings.append(f"Removed block matched {len(matches)} places; checking stronger strategies.") | |
| # Strategy 3: Markdown bullet recovery variants for old block. | |
| for variant in expand_markdown_old_block_variants(old_block, path, markdown_recovery)[1:]: | |
| matches = find_exact_block(file_lines, variant) | |
| if len(matches) == 1: | |
| start, end = matches[0] | |
| return MatchResult( | |
| True, | |
| start, | |
| end, | |
| 0.91, | |
| "markdown_bullet_recovery_old_block", | |
| warnings + ["Applied Markdown bullet recovery for old block."], | |
| [], | |
| 1, | |
| ), new_block | |
| # Strategy 4: normalized old block. | |
| matches = find_normalized_block(file_lines, old_block) | |
| if len(matches) == 1: | |
| start, end = matches[0] | |
| return MatchResult( | |
| True, | |
| start, | |
| end, | |
| 0.90, | |
| "normalized_old_block", | |
| warnings + ["Whitespace-normalized match used."], | |
| [], | |
| 1, | |
| ), new_block | |
| if len(matches) > 1: | |
| warnings.append(f"Normalized old block matched {len(matches)} places; skipped normalized strategy.") | |
| # Strategy 4a: Markdown-equivalent removed block. | |
| # Handles small replace atoms where patch says `* item` but target uses `- item`, | |
| # or vice versa. | |
| if removed_only and added_only and markdown_recovery and path.suffix.lower() in MARKDOWN_EXTENSIONS: | |
| matches = find_markdown_equiv_block(file_lines, removed_only) | |
| if len(matches) == 1: | |
| start, end = matches[0] | |
| replacement = replacement_preserving_matched_context( | |
| hunk=hunk, | |
| matched_old_lines=file_lines[start:end], | |
| path=path, | |
| ) | |
| return MatchResult( | |
| True, | |
| start, | |
| end, | |
| 0.92, | |
| "markdown_equiv_removed_block", | |
| warnings + ["Markdown bullet-marker-equivalent removed block match used."], | |
| [], | |
| 1, | |
| ), replacement | |
| if len(matches) > 1: | |
| warnings.append(f"Markdown-equivalent removed block matched {len(matches)} places; skipped strategy.") | |
| # Strategy 4b: Markdown-equivalent old block. | |
| # This handles target files that use '-' bullets while the AI patch uses '*' | |
| # bullets, or vice versa. Replacement preserves matched target context lines. | |
| if markdown_recovery and path.suffix.lower() in MARKDOWN_EXTENSIONS: | |
| matches = find_markdown_equiv_block(file_lines, old_block) | |
| if len(matches) == 1: | |
| start, end = matches[0] | |
| replacement = replacement_preserving_matched_context( | |
| hunk=hunk, | |
| matched_old_lines=file_lines[start:end], | |
| path=path, | |
| ) | |
| return MatchResult( | |
| True, | |
| start, | |
| end, | |
| 0.91, | |
| "markdown_equiv_old_block", | |
| warnings + ["Markdown bullet-marker-equivalent block match used."], | |
| [], | |
| 1, | |
| ), replacement | |
| if len(matches) > 1: | |
| warnings.append(f"Markdown-equivalent old block matched {len(matches)} places; skipped strategy.") | |
| # Strategy 5: insertion between exact context lines. | |
| # Works for hunks with only additions plus context. | |
| if added_only and not removed_only and len(context_only) >= 1: | |
| # Use full old_block as context if possible; replacement should insert at its location. | |
| matches = find_exact_block(file_lines, context_only) | |
| if len(matches) == 1: | |
| start, end = matches[0] | |
| # Replacing context-only with context+additions is safe when context block is unique. | |
| return MatchResult( | |
| True, | |
| start, | |
| end, | |
| 0.92, | |
| "exact_context_insert", | |
| warnings, | |
| [], | |
| 1, | |
| ), new_block | |
| if len(matches) > 1: | |
| return MatchResult( | |
| False, | |
| confidence=0.65, | |
| method="ambiguous_context_insert", | |
| warnings=warnings + [f"Insertion context matched {len(matches)} places; skipped."], | |
| candidate_count=len(matches), | |
| ), new_block | |
| # Strategy 6: tail-anchor evidence, report-only for now unless very strong. | |
| anchors = [a for a in (tail_anchor(line) for line in old_block) if a] | |
| if anchors: | |
| joined_file = "\n".join(file_lines) | |
| found_anchors = [a for a in anchors if a in joined_file] | |
| if found_anchors and len(found_anchors) == len(anchors): | |
| warnings.append("Tail anchors were present, but no safe unique block match was found.") | |
| # Strategy 7: fuzzy window fallback. | |
| candidates = fuzzy_window_candidates(file_lines, old_block) | |
| if candidates: | |
| best = candidates[0] | |
| close = [c for c in candidates if best[2] - c[2] < 0.03] | |
| if best[2] >= 0.98 and len(close) == 1 and added_only and not removed_only: | |
| start, end, score = best | |
| return MatchResult( | |
| True, | |
| start, | |
| end, | |
| 0.91, | |
| "near_exact_fuzzy_insert_only_window", | |
| warnings + [f"Near-exact fuzzy insert-only window used; score={score:.2f}."], | |
| [], | |
| len(candidates), | |
| ), new_block | |
| old_start_hint_for_top = parse_hunk_old_start(hunk.header) | |
| if ( | |
| old_start_hint_for_top is not None | |
| and old_start_hint_for_top <= 2 | |
| and best[2] >= 0.88 | |
| and best[0] <= 3 | |
| and len(new_block) <= 14 | |
| and 0 <= (best[1] - best[0]) <= 14 | |
| and removed_only | |
| and added_only | |
| ): | |
| start, end, score = best | |
| return MatchResult( | |
| True, | |
| start, | |
| end, | |
| 0.91, | |
| "top_of_file_fuzzy_replacement", | |
| warnings + [ | |
| f"Top-of-file fuzzy replacement promoted; score={score:.2f}." | |
| ], | |
| [], | |
| len(candidates), | |
| ), new_block | |
| if best[2] >= 0.94 and len(close) == 1: | |
| start, end, score = best | |
| is_bounded_insert_only = ( | |
| added_only | |
| and not removed_only | |
| and score >= 0.94 | |
| and len(added_only) <= 12 | |
| and len(new_block) <= 40 | |
| and (end - start) <= 20 | |
| ) | |
| return MatchResult( | |
| True, | |
| start, | |
| end, | |
| 0.91 if is_bounded_insert_only else min(0.89, score), | |
| "bounded_fuzzy_insert_only_window" if is_bounded_insert_only else "fuzzy_window", | |
| warnings + [ | |
| f"Bounded fuzzy insert-only window used; score={score:.2f}." | |
| if is_bounded_insert_only | |
| else "Fuzzy window match used; review recommended." | |
| ], | |
| [], | |
| len(candidates), | |
| ), new_block | |
| guided = header_guided_fuzzy_candidate( | |
| candidates=candidates, | |
| hunk=hunk, | |
| score_floor=0.94, | |
| max_distance=90, | |
| ) | |
| if guided and len(new_block) <= 90: | |
| start, end, score = guided | |
| if 0 <= (end - start) <= 45: | |
| return MatchResult( | |
| True, | |
| start, | |
| end, | |
| 0.91, | |
| "header_guided_fuzzy_window", | |
| warnings + [ | |
| f"Header-guided fuzzy window used as tie-breaker; score={score:.2f}." | |
| ], | |
| [], | |
| len(candidates), | |
| ), new_block | |
| # Last fuzzy tie-breaker: if the best fuzzy score is effectively exact | |
| # but duplicated, use the hunk header old-start line as a locality hint. | |
| # | |
| # This is safer than lowering fuzzy thresholds generally: it only fires | |
| # for near-perfect matches, bounded replacements, and a candidate close | |
| # to the original hunk location. | |
| old_start_hint = parse_hunk_old_start(hunk.header) | |
| if old_start_hint is not None and best[2] >= 0.99 and len(new_block) <= 120: | |
| close_exact = [c for c in candidates if best[2] - c[2] < 0.01] | |
| ranked = sorted(close_exact, key=lambda c: abs(c[0] - old_start_hint)) | |
| if ranked: | |
| start, end, score = ranked[0] | |
| if abs(start - old_start_hint) <= 140 and 0 <= (end - start) <= 60: | |
| return MatchResult( | |
| True, | |
| start, | |
| end, | |
| 0.91, | |
| "header_guided_perfect_fuzzy_window", | |
| warnings + [ | |
| f"Header-guided perfect fuzzy window used as tie-breaker; score={score:.2f}." | |
| ], | |
| [], | |
| len(candidates), | |
| ), new_block | |
| warnings.append( | |
| f"Best fuzzy candidate score {best[2]:.2f}, but not strong/unique enough for automatic apply." | |
| ) | |
| missing = line_presence_missing(file_lines, removed_only or old_block, path, markdown_recovery) | |
| return MatchResult( | |
| False, | |
| confidence=0.0, | |
| method="not_found", | |
| warnings=warnings, | |
| missing_lines=missing, | |
| candidate_count=len(candidates), | |
| ), new_block | |
| def detect_newline(raw: bytes) -> str: | |
| if b"\r\n" in raw: | |
| return "\r\n" | |
| return "\n" | |
| def bytes_to_lines(raw: bytes) -> tuple[list[str], str, bool]: | |
| newline = detect_newline(raw) | |
| text = raw.decode("utf-8") | |
| has_final_newline = text.endswith("\n") | |
| lines = text.splitlines() | |
| return lines, newline, has_final_newline | |
| def lines_to_bytes(lines: list[str], newline: str, has_final_newline: bool) -> bytes: | |
| text = newline.join(lines) | |
| if has_final_newline: | |
| text += newline | |
| return text.encode("utf-8") | |
| def safe_target_path(root: Path, rel: str) -> Path: | |
| candidate = (root / rel).resolve() | |
| root_resolved = root.resolve() | |
| try: | |
| candidate.relative_to(root_resolved) | |
| except ValueError: | |
| raise ValueError(f"Refusing path outside root: {rel}") | |
| return candidate | |
| def repair_markdown_inline_text_fence_artifact(line: str) -> str: | |
| """Repair broken inline AI text-fence artifacts in Markdown output. | |
| Example bad output: | |
| `text +shopRef + itemRef +` | |
| Intended content: | |
| shopRef + itemRef | |
| This only handles obvious single-line artifacts, not normal Markdown code. | |
| """ | |
| stripped = line.strip() | |
| m = re.match(r"^`{1,3}text\s+(?P<body>.+?)`*$", stripped) | |
| if not m: | |
| return line | |
| body = m.group("body").strip() | |
| # AI diffs sometimes preserve diff + markers inside the fake text fence. | |
| if body.startswith("+"): | |
| body = body[1:].strip() | |
| if body.endswith("+"): | |
| body = body[:-1].strip() | |
| return body | |
| def sanitize_replacement_lines(lines: list[str], path: Path) -> list[str]: | |
| """Final cleanup before marker-leak detection and writing. | |
| For Markdown, avoid writing AI/diff artifacts: | |
| - trailing whitespace | |
| - broken inline `text +...+` pseudo-fences | |
| """ | |
| if path.suffix.lower() not in MARKDOWN_EXTENSIONS: | |
| return lines | |
| out: list[str] = [] | |
| for line in lines: | |
| line = repair_markdown_inline_text_fence_artifact(line) | |
| line = line.rstrip() | |
| out.append(line) | |
| return out | |
| def suspicious_markdown_marker_leaks(lines: list[str], path: Path) -> list[str]: | |
| """Detect likely leaked diff markers in Markdown output. | |
| These usually mean smartpatch accidentally wrote patch syntax as document text. | |
| Legit Markdown + bullets use '+ item' with a space, so '+* item', '+```text', | |
| '++foo', '+A real sentence', etc. are suspicious outside code fences. | |
| """ | |
| if path.suffix.lower() not in MARKDOWN_EXTENSIONS: | |
| return [] | |
| leaks: list[str] = [] | |
| in_fence = False | |
| for idx, line in enumerate(lines, start=1): | |
| stripped = line.strip() | |
| if stripped.startswith("```"): | |
| # A literal +``` is suspicious before toggling. | |
| if line.startswith("+```"): | |
| leaks.append(f"replacement line {idx}: suspicious leaked marker `{line}`") | |
| continue | |
| in_fence = not in_fence | |
| continue | |
| if in_fence: | |
| continue | |
| if line.startswith(("++", "+*", "+-", "+#")): | |
| leaks.append(f"replacement line {idx}: suspicious leaked marker `{line}`") | |
| continue | |
| if line.startswith("+") and not line.startswith("+ "): | |
| leaks.append(f"replacement line {idx}: suspicious literal plus `{line}`") | |
| continue | |
| return leaks | |
| def apply_patch_file( | |
| patch_file: PatchFile, | |
| root: Path, | |
| dry_run: bool, | |
| min_confidence: float, | |
| markdown_recovery: bool, | |
| backup: bool, | |
| ) -> FileReport: | |
| rel = patch_file.target_path or "<unknown>" | |
| report = FileReport(file=rel, exists=False, hunks_total=len(patch_file.hunks)) | |
| report.warnings.extend(patch_file.parse_warnings) | |
| if not patch_file.target_path: | |
| report.skipped = len(patch_file.hunks) | |
| report.warnings.append("No target path; skipped file.") | |
| return report | |
| try: | |
| path = safe_target_path(root, patch_file.target_path) | |
| except ValueError as exc: | |
| report.skipped = len(patch_file.hunks) | |
| report.warnings.append(str(exc)) | |
| return report | |
| if path.exists(): | |
| report.exists = True | |
| raw = path.read_bytes() | |
| try: | |
| file_lines, newline, has_final_newline = bytes_to_lines(raw) | |
| except UnicodeDecodeError: | |
| report.skipped = len(patch_file.hunks) | |
| report.warnings.append("File is not valid UTF-8; skipped.") | |
| return report | |
| else: | |
| report.exists = False | |
| file_lines = [] | |
| newline = "\n" | |
| has_final_newline = True | |
| changed = False | |
| current_lines = file_lines[:] | |
| for index, hunk in enumerate(patch_file.hunks, start=1): | |
| match, replacement = locate_hunk(current_lines, hunk, path, min_confidence, markdown_recovery) | |
| replacement = sanitize_replacement_lines(replacement, path) | |
| # Promote bounded add-only fuzzy windows. | |
| # | |
| # locate_hunk() intentionally caps generic fuzzy_window confidence at 0.89. | |
| # For add-only hunks, this can be too strict when the fuzzy match is already | |
| # unique enough to return a concrete location. Keep this conservative: | |
| # - add-only only | |
| # - no deletions | |
| # - small replacement | |
| # - small matched span | |
| # - concrete location | |
| if ( | |
| match.method == "fuzzy_window" | |
| and match.found | |
| and match.confidence >= 0.89 | |
| and match.start is not None | |
| and match.end is not None | |
| ): | |
| _old_block, _new_block, _removed_only, _added_only, _context_only = old_new_blocks(hunk) | |
| _span = match.end - match.start | |
| if _added_only and not _removed_only and len(_added_only) <= 12 and _span <= 20: | |
| match.confidence = max(match.confidence, min_confidence) | |
| match.method = "promoted_fuzzy_insert_only_window" | |
| match.warnings.append( | |
| "Fuzzy insert-only window promoted because it is add-only, bounded, and had a concrete unique location." | |
| ) | |
| # Promote bounded fuzzy windows that only expand a small matched span. | |
| # | |
| # This handles add-only malformed AI hunks where the fuzzy matcher found a | |
| # concrete unique window but capped confidence at 0.89. It stays bounded: | |
| # - fuzzy_window only | |
| # - concrete location | |
| # - small span | |
| # - replacement expands the span | |
| # - replacement is not huge | |
| if ( | |
| match.method == "fuzzy_window" | |
| and match.found | |
| and match.confidence >= 0.89 | |
| and match.start is not None | |
| and match.end is not None | |
| ): | |
| _span = match.end - match.start | |
| if 0 <= _span <= 20 and len(replacement) > _span and len(replacement) <= 40: | |
| match.confidence = max(match.confidence, min_confidence) | |
| match.method = "promoted_bounded_fuzzy_expanding_window" | |
| match.warnings.append( | |
| "Bounded fuzzy expanding window promoted over threshold." | |
| ) | |
| _old_block_for_gate, _new_block_for_gate, _removed_only_for_gate, _added_only_for_gate, _context_only_for_gate = old_new_blocks(hunk) | |
| _span_for_gate = ( | |
| match.end - match.start | |
| if match.start is not None and match.end is not None | |
| else 999999 | |
| ) | |
| _smartpatch_allow_fuzzy_window = ( | |
| match.method == "fuzzy_window" | |
| and match.found | |
| and match.confidence >= 0.88 | |
| and match.start is not None | |
| and match.end is not None | |
| and _added_only_for_gate | |
| and not _removed_only_for_gate | |
| and len(_added_only_for_gate) <= 12 | |
| and 0 <= _span_for_gate <= 20 | |
| and len(replacement) <= 40 | |
| ) | |
| if _smartpatch_allow_fuzzy_window: | |
| match.confidence = max(match.confidence, min_confidence) | |
| match.method = "promoted_bounded_fuzzy_insert_only_window" | |
| match.warnings.append( | |
| "Bounded fuzzy insert-only window promoted over threshold in apply gate." | |
| ) | |
| if not _smartpatch_allow_fuzzy_window and (not match.found or match.confidence < min_confidence or match.start is None or match.end is None): | |
| is_already_applied = match.method.startswith("already_applied") | |
| is_noop = match.method.startswith("noop_") | |
| # Last-chance safe promotion for bounded fuzzy insert windows. | |
| # This handles cases where locate_hunk found a concrete fuzzy_window | |
| # at ~0.89, but the earlier gate did not fire due to parser drift. | |
| if ( | |
| match.method == "fuzzy_window" | |
| and match.found | |
| and match.confidence >= 0.87 | |
| and match.start is not None | |
| and match.end is not None | |
| and 0 <= (match.end - match.start) <= 20 | |
| and len(replacement) <= 40 | |
| and len(replacement) >= (match.end - match.start) | |
| ): | |
| marker_leaks = suspicious_markdown_marker_leaks(replacement, path) | |
| if not marker_leaks: | |
| current_lines = current_lines[: match.start] + replacement + current_lines[match.end :] | |
| changed = True | |
| if dry_run: | |
| report.would_apply += 1 | |
| else: | |
| report.applied += 1 | |
| report.hunk_reports.append( | |
| HunkReport( | |
| file=rel, | |
| hunk_index=index, | |
| action="would_apply" if dry_run else "applied", | |
| confidence=max(match.confidence, min_confidence), | |
| method="last_chance_promoted_bounded_fuzzy_window", | |
| start_line=match.start + 1, | |
| end_line=match.end, | |
| warnings=match.warnings + [ | |
| "Last-chance bounded fuzzy window promoted; no marker leaks detected." | |
| ], | |
| missing_lines=[], | |
| ) | |
| ) | |
| continue | |
| if is_already_applied or is_noop: | |
| report.already_applied += 1 | |
| else: | |
| report.skipped += 1 | |
| details = "" | |
| if is_already_applied: | |
| details = "Hunk appears already applied; no action needed." | |
| elif is_noop: | |
| details = "Hunk has no requested changes; no action needed." | |
| elif len(match.missing_lines) >= 3: | |
| details = "Several expected lines were not found; this may be the wrong file, wrong branch, or stale AI diff." | |
| elif len(match.missing_lines) >= 1: | |
| details = "One or more expected lines were not found." | |
| report.hunk_reports.append( | |
| HunkReport( | |
| file=rel, | |
| hunk_index=index, | |
| action="already_applied" if (is_already_applied or is_noop) else "skipped", | |
| confidence=match.confidence, | |
| method=match.method, | |
| start_line=None if match.start is None else match.start + 1, | |
| end_line=None if match.end is None else match.end, | |
| warnings=match.warnings, | |
| missing_lines=match.missing_lines, | |
| details=details, | |
| ) | |
| ) | |
| continue | |
| marker_leaks = suspicious_markdown_marker_leaks(replacement, path) | |
| if marker_leaks: | |
| report.skipped += 1 | |
| report.hunk_reports.append( | |
| HunkReport( | |
| file=rel, | |
| hunk_index=index, | |
| action="skipped", | |
| confidence=0.0, | |
| method="suspicious_marker_leak", | |
| start_line=match.start + 1, | |
| end_line=match.end, | |
| warnings=match.warnings + marker_leaks, | |
| missing_lines=[], | |
| details="Suspicious literal diff markers would be written; hunk skipped.", | |
| ) | |
| ) | |
| continue | |
| # Apply to in-memory lines. | |
| current_lines = current_lines[: match.start] + replacement + current_lines[match.end :] | |
| changed = True | |
| action: Literal["applied", "would_apply"] = "would_apply" if dry_run else "applied" | |
| if dry_run: | |
| report.would_apply += 1 | |
| else: | |
| report.applied += 1 | |
| report.hunk_reports.append( | |
| HunkReport( | |
| file=rel, | |
| hunk_index=index, | |
| action=action, | |
| confidence=match.confidence, | |
| method=match.method, | |
| start_line=match.start + 1, | |
| end_line=match.end, | |
| warnings=match.warnings, | |
| missing_lines=match.missing_lines, | |
| ) | |
| ) | |
| if changed and not dry_run: | |
| if backup and path.exists(): | |
| stamp = datetime.now().strftime("%Y%m%d-%H%M%S") | |
| backup_path = path.with_name(f"{path.name}.smartpatch-{stamp}.bak") | |
| shutil.copy2(path, backup_path) | |
| report.warnings.append(f"Backup written: {backup_path}") | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| path.write_bytes(lines_to_bytes(current_lines, newline, has_final_newline)) | |
| return report | |
| def markdown_report(run: RunReport) -> str: | |
| lines: list[str] = [] | |
| lines.append("# Smartpatch Report") | |
| lines.append("") | |
| lines.append("## Summary") | |
| lines.append("") | |
| lines.append(f"- Patch: `{run.patch}`") | |
| lines.append(f"- Root: `{run.root}`") | |
| lines.append(f"- Mode: `{'dry-run' if run.dry_run else 'write'}`") | |
| lines.append(f"- Minimum confidence: `{run.min_confidence:.2f}`") | |
| lines.append(f"- Files: `{run.files_total}`") | |
| lines.append(f"- Hunks: `{run.hunks_total}`") | |
| lines.append(f"- Applied: `{run.applied}`") | |
| lines.append(f"- Would apply: `{run.would_apply}`") | |
| lines.append(f"- Already applied: `{run.already_applied}`") | |
| lines.append(f"- Skipped: `{run.skipped}`") | |
| lines.append("") | |
| if run.warnings: | |
| lines.append("## Run Warnings") | |
| lines.append("") | |
| for warning in run.warnings: | |
| lines.append(f"- {warning}") | |
| lines.append("") | |
| for fr in run.file_reports: | |
| lines.append(f"## {fr.file}") | |
| lines.append("") | |
| lines.append(f"- Exists: `{fr.exists}`") | |
| lines.append(f"- Hunks: `{fr.hunks_total}`") | |
| lines.append(f"- Applied: `{fr.applied}`") | |
| lines.append(f"- Would apply: `{fr.would_apply}`") | |
| lines.append(f"- Already applied: `{fr.already_applied}`") | |
| lines.append(f"- Skipped: `{fr.skipped}`") | |
| if fr.warnings: | |
| lines.append("- Warnings:") | |
| for warning in fr.warnings: | |
| lines.append(f" - {warning}") | |
| lines.append("") | |
| for hr in fr.hunk_reports: | |
| lines.append(f"### Hunk {hr.hunk_index} — {hr.action}") | |
| lines.append("") | |
| lines.append(f"- Confidence: `{hr.confidence:.2f}`") | |
| lines.append(f"- Method: `{hr.method}`") | |
| if hr.start_line is not None: | |
| lines.append(f"- Location: lines `{hr.start_line}`-`{hr.end_line}`") | |
| if hr.details: | |
| lines.append(f"- Assessment: {hr.details}") | |
| if hr.warnings: | |
| lines.append("- Warnings:") | |
| for warning in hr.warnings: | |
| lines.append(f" - {warning}") | |
| if hr.missing_lines: | |
| lines.append("- Missing expected lines:") | |
| for missing in hr.missing_lines[:20]: | |
| lines.append(f" - `{missing}`") | |
| if len(hr.missing_lines) > 20: | |
| lines.append(f" - ...and {len(hr.missing_lines) - 20} more") | |
| lines.append("") | |
| return "\n".join(lines) | |
| def safety_verdict(run: RunReport) -> tuple[str, str]: | |
| if run.hunks_total == 0: | |
| return "UNSAFE", "no actionable hunks found" | |
| if run.skipped > 0: | |
| return "UNSAFE", f"{run.skipped} skipped hunk(s)" | |
| if run.would_apply > 0 and run.dry_run: | |
| return "SAFE", f"{run.would_apply} hunk(s) would apply, {run.already_applied} already applied" | |
| if run.applied > 0 and not run.dry_run: | |
| return "APPLIED", f"{run.applied} hunk(s) applied, {run.already_applied} already applied" | |
| if run.already_applied > 0: | |
| return "NOOP", f"all {run.already_applied} hunk(s) already applied" | |
| return "UNSAFE", "nothing applyable found" | |
| def print_console_summary(run: RunReport) -> None: | |
| verdict, reason = safety_verdict(run) | |
| print(f"{verdict}: {reason}") | |
| print( | |
| f"files={run.files_total} hunks={run.hunks_total} " | |
| f"would_apply={run.would_apply} already_applied={run.already_applied} skipped={run.skipped}" | |
| ) | |
| if verdict == "UNSAFE": | |
| shown = 0 | |
| for fr in run.file_reports: | |
| for hr in fr.hunk_reports: | |
| if hr.action == "skipped": | |
| print( | |
| f"- {fr.file}: hunk {hr.hunk_index} skipped " | |
| f"({hr.method}, confidence={hr.confidence:.2f})" | |
| ) | |
| shown += 1 | |
| if shown >= 5: | |
| return | |
| def detect_git_root(start: Path) -> Path | None: | |
| try: | |
| result = subprocess.run( | |
| ["git", "rev-parse", "--show-toplevel"], | |
| cwd=start, | |
| check=True, | |
| capture_output=True, | |
| text=True, | |
| ) | |
| except (subprocess.CalledProcessError, FileNotFoundError): | |
| return None | |
| root = result.stdout.strip() | |
| return Path(root).resolve() if root else None | |
| def resolve_root(root_arg: str | None) -> Path: | |
| if root_arg: | |
| return Path(root_arg).expanduser().resolve() | |
| return detect_git_root(Path.cwd()) or Path.cwd().resolve() | |
| def default_report_path(root: Path, patch_path: Path) -> Path: | |
| return root / "tmp" / f"{patch_path.stem}_patch-report.md" | |
| def resolve_report_path(report_arg: str | None, root: Path, patch_path: Path) -> Path | None: | |
| if report_arg is None: | |
| return None | |
| if report_arg == "AUTO": | |
| return default_report_path(root, patch_path) | |
| return Path(report_arg).expanduser() | |
| def cmd_apply(args: argparse.Namespace) -> int: | |
| patch_path = Path(args.patch).expanduser().resolve() | |
| root = resolve_root(args.root) | |
| dry_run = not args.write | |
| if not patch_path.exists(): | |
| print(f"Patch not found: {patch_path}", file=sys.stderr) | |
| return 2 | |
| if not root.exists(): | |
| print(f"Root not found: {root}", file=sys.stderr) | |
| return 2 | |
| patch_text = patch_path.read_text(encoding="utf-8") | |
| patch_text, repair_warnings = repair_ai_patch_text(patch_text, root) | |
| patch_files, parse_warnings = parse_patch(patch_text) | |
| parse_warnings = repair_warnings + parse_warnings | |
| run = RunReport( | |
| patch=str(patch_path), | |
| root=str(root), | |
| dry_run=dry_run, | |
| min_confidence=args.min_confidence, | |
| files_total=len(patch_files), | |
| hunks_total=sum(len(pf.hunks) for pf in patch_files), | |
| warnings=parse_warnings, | |
| ) | |
| for pf in patch_files: | |
| fr = apply_patch_file( | |
| pf, | |
| root=root, | |
| dry_run=dry_run, | |
| min_confidence=args.min_confidence, | |
| markdown_recovery=not args.no_markdown_bullet_recovery, | |
| backup=args.backup, | |
| ) | |
| run.file_reports.append(fr) | |
| run.applied += fr.applied | |
| run.would_apply += fr.would_apply | |
| run.already_applied += fr.already_applied | |
| run.skipped += fr.skipped | |
| report_text = markdown_report(run) | |
| if args.verbose: | |
| print(report_text) | |
| report_path = resolve_report_path(args.report, root, patch_path) | |
| if report_path: | |
| if not report_path.is_absolute(): | |
| report_path = root / report_path | |
| report_path.parent.mkdir(parents=True, exist_ok=True) | |
| report_path.write_text(report_text, encoding="utf-8") | |
| print(f"Report written: {report_path}") | |
| if args.json_report: | |
| json_path = Path(args.json_report).expanduser() | |
| json_path.parent.mkdir(parents=True, exist_ok=True) | |
| json_path.write_text(json.dumps(asdict(run), indent=2, ensure_ascii=False), encoding="utf-8") | |
| print_console_summary(run) | |
| return 1 if run.skipped else 0 | |
| def build_parser() -> argparse.ArgumentParser: | |
| parser = argparse.ArgumentParser( | |
| prog="smartpatch.py", | |
| description="Conservative fuzzy patch applier for AI-made unified diffs.", | |
| ) | |
| sub = parser.add_subparsers(dest="command", required=True) | |
| apply = sub.add_parser("apply", help="dry-run or apply an AI-made diff") | |
| apply.add_argument("patch", help="path to .diff/.patch file") | |
| apply.add_argument("--root", default=None, help="repo/root directory; default: auto-detected git root, else current directory") | |
| apply.add_argument("--write", action="store_true", help="actually modify files; default is dry-run") | |
| apply.add_argument("--backup", action="store_true", help="write .smartpatch timestamp backups before modifying files") | |
| apply.add_argument("--min-confidence", type=float, default=0.90, help="minimum confidence required to apply; default: 0.90") | |
| apply.add_argument( | |
| "--report", | |
| nargs="?", | |
| const="AUTO", | |
| help="also write markdown report; default path: tmp/<diff-filename>_patch-report.md", | |
| ) | |
| apply.add_argument("--json-report", help="write JSON report to this path") | |
| apply.add_argument("--verbose", action="store_true", help="print full markdown report to terminal") | |
| apply.add_argument( | |
| "--no-markdown-bullet-recovery", | |
| action="store_true", | |
| help="disable Markdown fallback matching for AI diffs that omit bullet markers", | |
| ) | |
| apply.set_defaults(func=cmd_apply) | |
| return parser | |
| def main(argv: list[str] | None = None) -> int: | |
| parser = build_parser() | |
| args = parser.parse_args(argv) | |
| return args.func(args) | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
smartpatch.py treats AI diffs as intent rather than perfect Git patches: ignores unreliable hunk line numbers, finds edits by exact/context matching, handles Markdown list hyphens safely, detects already-applied hunks, and refuses low-confidence or ambiguous changes. Dry-run is default, with optional write mode, backups, and Markdown reports.