Created
March 15, 2026 21:33
-
-
Save tianyuf/c8e9b110e3f4224cfbcce9d918fb0d33 to your computer and use it in GitHub Desktop.
OCR archival scans and link results back to document metadata
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| OCR archival scans and link results back to document metadata. | |
| Usage: | |
| python ocr_archive.py <archive_folder> [--engine tesseract|qwen] [--dpi 300] [--force] | |
| Examples: | |
| python ocr_archive.py SHMA # tesseract, default | |
| python ocr_archive.py SHMA --engine qwen # Qwen3-VL via OpenRouter | |
| python ocr_archive.py SHMA --engine qwen --dpi 200 # lower DPI = smaller images = faster | |
| python ocr_archive.py SHMA --link-only # just update doc metadata | |
| Folder structure expected: | |
| <archive_folder>/ | |
| ├── docs/ # Markdown files with YAML frontmatter | |
| ├── scans/ # PDFs and/or images (JPEG, PNG, TIFF) | |
| └── ocr/ # Created by this script | |
| Output: | |
| ocr/<scan_stem>_ocr.md # OCR markdown with per-page sections | |
| OCR engines: | |
| tesseract Local tesseract with chi_sim+eng (fast, free, decent) | |
| qwen Qwen3-VL-8B via OpenRouter API (better for handwriting/complex layouts) | |
| Requires OPENROUTER_API_KEY env var or .env file in archive root. | |
| """ | |
| import argparse | |
| import base64 | |
| import os | |
| import re | |
| import sys | |
| import time | |
| from pathlib import Path | |
| try: | |
| import fitz # pymupdf | |
| except ImportError: | |
| fitz = None | |
| try: | |
| from PIL import Image as PILImage | |
| except ImportError: | |
| PILImage = None | |
| import subprocess | |
| # Try loading .env from script directory (where the archives live) | |
| try: | |
| from dotenv import load_dotenv | |
| _script_dir = os.path.dirname(os.path.abspath(__file__)) | |
| load_dotenv(os.path.join(_script_dir, ".env")) | |
| except ImportError: | |
| pass | |
| # --------------------------------------------------------------------------- | |
| # Config | |
| # --------------------------------------------------------------------------- | |
| OPENROUTER_MODEL = "qwen/qwen3-vl-8b-instruct" | |
| DEEPSEEK_MODEL = "deepseek/deepseek-chat-v3-0324" | |
| OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions" | |
| OCR_PROMPT = ( | |
| "You are an expert OCR system for Chinese archival documents. " | |
| "Transcribe ALL text visible in this image. " | |
| "All Chinese text must be output in simplified Chinese (简体中文). " | |
| "Preserve the original layout as much as possible using line breaks. " | |
| "Output ONLY the transcribed text, nothing else. " | |
| "If there are stamps, seals, or handwritten annotations, note them in parentheses." | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Tesseract engine | |
| # --------------------------------------------------------------------------- | |
| def find_tesseract(): | |
| for p in ["/opt/homebrew/bin/tesseract", "/usr/local/bin/tesseract"]: | |
| if os.path.isfile(p): | |
| return p | |
| try: | |
| result = subprocess.run(["which", "tesseract"], capture_output=True, text=True) | |
| if result.returncode == 0: | |
| return result.stdout.strip() | |
| except Exception: | |
| pass | |
| return None | |
| TESSERACT = find_tesseract() | |
| def ocr_tesseract(image_path: str, lang: str = "chi_sim+eng") -> str: | |
| """Run tesseract OCR on an image file.""" | |
| if not TESSERACT: | |
| raise RuntimeError("tesseract not found — install with: brew install tesseract") | |
| result = subprocess.run( | |
| [TESSERACT, str(image_path), "stdout", "-l", lang, "--psm", "6"], | |
| capture_output=True, text=True, | |
| ) | |
| if result.returncode != 0: | |
| print(f" WARNING: tesseract error: {result.stderr.strip()}") | |
| return "" | |
| return result.stdout | |
| # --------------------------------------------------------------------------- | |
| # Qwen VL engine (via OpenRouter) | |
| # --------------------------------------------------------------------------- | |
| def get_openrouter_key(): | |
| key = os.environ.get("OPENROUTER_API_KEY", "") | |
| if not key: | |
| print("ERROR: OPENROUTER_API_KEY not set.") | |
| print(" Set it as an environment variable or in a .env file.") | |
| sys.exit(1) | |
| return key | |
| def image_to_data_uri(image_path: str) -> str: | |
| """Convert an image file to a base64 data URI.""" | |
| ext = Path(image_path).suffix.lower() | |
| mime_map = { | |
| ".png": "image/png", | |
| ".jpg": "image/jpeg", | |
| ".jpeg": "image/jpeg", | |
| ".tiff": "image/tiff", | |
| ".tif": "image/tiff", | |
| ".bmp": "image/bmp", | |
| } | |
| mime = mime_map.get(ext, "image/png") | |
| with open(image_path, "rb") as f: | |
| b64 = base64.b64encode(f.read()).decode("utf-8") | |
| return f"data:{mime};base64,{b64}" | |
| def ocr_qwen(image_path: str, api_key: str = None, max_retries: int = 3) -> str: | |
| """OCR an image using Qwen3-VL via OpenRouter.""" | |
| import requests | |
| if api_key is None: | |
| api_key = get_openrouter_key() | |
| data_uri = image_to_data_uri(image_path) | |
| payload = { | |
| "model": OPENROUTER_MODEL, | |
| "messages": [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": OCR_PROMPT}, | |
| {"type": "image_url", "image_url": {"url": data_uri}}, | |
| ], | |
| } | |
| ], | |
| "max_tokens": 4096, | |
| "temperature": 0.1, | |
| "repetition_penalty": 1.2, | |
| } | |
| headers = { | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json", | |
| "HTTP-Referer": "https://github.com/archive-ocr", | |
| } | |
| for attempt in range(max_retries): | |
| try: | |
| resp = requests.post(OPENROUTER_URL, json=payload, headers=headers, timeout=120) | |
| if resp.status_code == 429: | |
| wait = min(2 ** attempt * 5, 30) | |
| print(f" rate limited, waiting {wait}s...", end="", flush=True) | |
| time.sleep(wait) | |
| continue | |
| resp.raise_for_status() | |
| data = resp.json() | |
| return data["choices"][0]["message"]["content"].strip() | |
| except requests.exceptions.RequestException as e: | |
| if attempt < max_retries - 1: | |
| wait = 2 ** attempt * 2 | |
| print(f" error ({e}), retrying in {wait}s...", end="", flush=True) | |
| time.sleep(wait) | |
| else: | |
| print(f"\n ERROR: API request failed after {max_retries} attempts: {e}") | |
| return "" | |
| return "" | |
| # --------------------------------------------------------------------------- | |
| # PDF / image helpers | |
| # --------------------------------------------------------------------------- | |
| def extract_pdf_pages(pdf_path: str, output_dir: str, dpi: int = 150): | |
| """Extract each page of a PDF as a JPEG image. Returns list of paths.""" | |
| if fitz is None: | |
| raise RuntimeError("pymupdf (fitz) is required — pip install pymupdf") | |
| os.makedirs(output_dir, exist_ok=True) | |
| doc = fitz.open(pdf_path) | |
| pages = [] | |
| zoom = dpi / 72.0 | |
| mat = fitz.Matrix(zoom, zoom) | |
| for i, page in enumerate(doc): | |
| pix = page.get_pixmap(matrix=mat) | |
| out_path = os.path.join(output_dir, f"page_{i+1:03d}.jpg") | |
| pix.save(out_path, jpg_quality=85) | |
| pages.append(out_path) | |
| doc.close() | |
| return pages | |
| def is_image_file(path: str) -> bool: | |
| return Path(path).suffix.lower() in {".jpg", ".jpeg", ".png", ".tiff", ".tif", ".bmp"} | |
| def is_pdf_file(path: str) -> bool: | |
| return Path(path).suffix.lower() == ".pdf" | |
| # --------------------------------------------------------------------------- | |
| # Matching: scans <-> docs | |
| # --------------------------------------------------------------------------- | |
| def scan_matches_doc(scan_stem: str, archive_loc: str, | |
| archive_name: str = "") -> bool: | |
| """ | |
| A scan matches a doc if: | |
| - scan_stem == archive_loc (exact), or | |
| - archive_loc starts with scan_stem + "-" (scan is parent volume) | |
| Also tries after stripping archive_name prefix from scan_stem | |
| (e.g. "BJMA 1-22-371-46" matches archive_loc "1-22-371-46"). | |
| """ | |
| if not archive_loc: | |
| return False | |
| def _matches(stem, loc): | |
| return loc == stem or loc.startswith(stem + "-") | |
| if _matches(scan_stem, archive_loc): | |
| return True | |
| # Try stripping archive name prefix (e.g. "BJMA " from "BJMA 1-22-371-46") | |
| if archive_name: | |
| prefix = archive_name + " " | |
| if scan_stem.startswith(prefix): | |
| stripped = scan_stem[len(prefix):] | |
| if _matches(stripped, archive_loc): | |
| return True | |
| return False | |
| def parse_doc_frontmatter(filepath: str) -> dict: | |
| """Parse YAML frontmatter from a doc markdown file.""" | |
| with open(filepath, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| if not content.startswith("---"): | |
| return {} | |
| end = content.find("---", 3) | |
| if end == -1: | |
| return {} | |
| fm = {} | |
| for line in content[3:end].strip().split("\n"): | |
| if ":" in line and not line.startswith(" "): | |
| key = line.split(":", 1)[0].strip() | |
| val = line.split(":", 1)[1].strip() | |
| fm[key] = val | |
| return fm | |
| def update_doc_frontmatter(filepath: str, updates: dict): | |
| """Update specific frontmatter fields in a doc file.""" | |
| with open(filepath, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| if not content.startswith("---"): | |
| return | |
| end = content.find("---", 3) | |
| if end == -1: | |
| return | |
| fm_text = content[3:end] | |
| body = content[end + 3:] | |
| for key, val in updates.items(): | |
| # Try replacing empty field first, then existing value | |
| pattern = rf"^({re.escape(key)}:)\s*$" | |
| replacement = rf"\1 {val}" | |
| new_text = re.sub(pattern, replacement, fm_text, flags=re.MULTILINE) | |
| if new_text == fm_text: | |
| pattern = rf"^({re.escape(key)}:)\s+.*$" | |
| new_text = re.sub(pattern, replacement, fm_text, flags=re.MULTILINE) | |
| fm_text = new_text | |
| with open(filepath, "w", encoding="utf-8") as f: | |
| f.write("---" + fm_text + "---" + body) | |
| # --------------------------------------------------------------------------- | |
| # Obsidian link helpers | |
| # --------------------------------------------------------------------------- | |
| def obsidian_pdf_page_link(scan_filename: str, page_num: int) -> str: | |
| return f"![[{scan_filename}#page={page_num}]]" | |
| def obsidian_image_embed(image_path: str) -> str: | |
| return f"![[{image_path}]]" | |
| # --------------------------------------------------------------------------- | |
| # Main OCR pipeline | |
| # --------------------------------------------------------------------------- | |
| def _write_ocr_header(ocr_md_path: str, scan_filename: str, scan_stem: str, | |
| total_pages: int, engine_label: str): | |
| """Write the frontmatter and heading to start an OCR markdown file.""" | |
| with open(ocr_md_path, "w", encoding="utf-8") as f: | |
| f.write("\n".join([ | |
| "---", | |
| f"pdf: \"[[{scan_filename}]]\"", | |
| "docs:", | |
| f"total_pages: {total_pages}", | |
| f"ocr_engine: \"{engine_label}\"", | |
| "---", | |
| f"# OCR: {scan_stem}", | |
| "", | |
| ])) | |
| def _append_ocr_page(ocr_md_path: str, page_num: int, text: str, | |
| scan_filename: str): | |
| """Append a single page's OCR result to the markdown file.""" | |
| with open(ocr_md_path, "a", encoding="utf-8") as f: | |
| f.write(f"**[[{scan_filename}#page={page_num}|Page {page_num}]]**\n") | |
| f.write("\n") | |
| f.write((text if text else "*(no text detected)*") + "\n") | |
| f.write("\n") | |
| f.write("---\n") | |
| f.write("\n") | |
| def _count_completed_pages(ocr_md_path: str) -> int: | |
| """Count how many page headers exist in an OCR markdown file.""" | |
| if not os.path.exists(ocr_md_path): | |
| return 0 | |
| with open(ocr_md_path, "r", encoding="utf-8") as f: | |
| return sum(1 for line in f if re.match(r"^(\*\*|## ).*Page \d+", line)) | |
| def _ocr_page(img_path: str, engine: str, lang: str, api_key: str = None, | |
| page_retries: int = 3) -> str: | |
| """OCR a single page image with page-level retries.""" | |
| for attempt in range(page_retries): | |
| if engine == "qwen": | |
| text = ocr_qwen(img_path, api_key=api_key) | |
| else: | |
| text = ocr_tesseract(img_path, lang=lang) | |
| if text.strip(): | |
| return text | |
| if attempt < page_retries - 1: | |
| wait = 2 ** attempt * 3 | |
| print(f"(empty, retry {attempt+2}/{page_retries} in {wait}s)", end=" ", flush=True) | |
| time.sleep(wait) | |
| return text # return whatever we got on last attempt | |
| def ocr_scan(scan_path: str, ocr_dir: str, engine: str, lang: str, dpi: int, | |
| api_key: str = None) -> str: | |
| """ | |
| OCR a single scan file (PDF or image). | |
| Writes each page to disk immediately after processing. | |
| Resumes from where it left off if partially complete. | |
| Returns path to generated OCR markdown file. | |
| """ | |
| scan_path = Path(scan_path) | |
| scan_stem = scan_path.stem | |
| scan_filename = scan_path.name | |
| pages_dir = os.path.join(ocr_dir, f"{scan_stem}_pages") | |
| ocr_md_path = os.path.join(ocr_dir, f"{scan_stem}_ocr.md") | |
| os.makedirs(ocr_dir, exist_ok=True) | |
| engine_label = f"qwen ({OPENROUTER_MODEL})" if engine == "qwen" else f"tesseract ({lang})" | |
| total = 1 | |
| if is_pdf_file(str(scan_path)): | |
| print(f" Extracting pages at {dpi} DPI...") | |
| page_images = extract_pdf_pages(str(scan_path), pages_dir, dpi=dpi) | |
| total = len(page_images) | |
| # Check for partial progress | |
| done = _count_completed_pages(ocr_md_path) | |
| if done >= total: | |
| print(f" All {total} pages already done, skipping.") | |
| return ocr_md_path | |
| if done > 0: | |
| print(f" Resuming from page {done+1} ({done}/{total} already done)...") | |
| else: | |
| _write_ocr_header(ocr_md_path, scan_filename, scan_stem, total, engine_label) | |
| print(f" {total} pages. Running OCR ({engine})...") | |
| for i, img_path in enumerate(page_images): | |
| page_num = i + 1 | |
| if page_num <= done: | |
| continue | |
| print(f" Page {page_num}/{total}...", end=" ", flush=True) | |
| text = _ocr_page(img_path, engine, lang, api_key) | |
| _append_ocr_page(ocr_md_path, page_num, text.strip(), scan_filename) | |
| print(f"({len(text.strip())} chars)") | |
| elif is_image_file(str(scan_path)): | |
| print(f" OCRing image ({engine})...") | |
| _write_ocr_header(ocr_md_path, scan_filename, scan_stem, 1, engine_label) | |
| text = _ocr_page(str(scan_path), engine, lang, api_key) | |
| _append_ocr_page(ocr_md_path, 1, text.strip(), scan_filename) | |
| else: | |
| print(f" Skipping unsupported file: {scan_filename}") | |
| return "" | |
| # Verify and clean up | |
| _verify_and_cleanup(ocr_md_path, pages_dir, total if is_pdf_file(str(scan_path)) else 1) | |
| return ocr_md_path | |
| def _verify_and_cleanup(ocr_md_path: str, pages_dir: str, expected_pages: int): | |
| """Verify OCR output integrity, then delete extracted page images.""" | |
| actual = _count_completed_pages(ocr_md_path) | |
| if actual < expected_pages: | |
| print(f" WARNING: only {actual}/{expected_pages} pages in output, keeping page images.") | |
| return | |
| # Check that no page is empty | |
| with open(ocr_md_path, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| empty = [m.group(1) for m in re.finditer( | |
| r"## Page (\d+)\s*\n\s*\*\(no text detected\)\*", content | |
| )] | |
| if empty: | |
| print(f" WARNING: pages {', '.join(empty)} have no text, keeping page images for retry.") | |
| return | |
| # All good — delete page images | |
| if os.path.isdir(pages_dir): | |
| import shutil | |
| shutil.rmtree(pages_dir) | |
| print(f" Verified {actual} pages OK. Cleaned up page images.") | |
| def _update_ocr_docs_field(ocr_md_path: str, doc_stems: list): | |
| """Replace the docs: field in an OCR file's frontmatter with a list of doc links.""" | |
| with open(ocr_md_path, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| if not content.startswith("---"): | |
| return | |
| end = content.find("---", 3) | |
| if end == -1: | |
| return | |
| fm_text = content[3:end] | |
| body = content[end + 3:] | |
| # Remove existing docs: field and any indented list items following it | |
| fm_text = re.sub(r"^docs:.*(?:\n - .*)*\n?", "", fm_text, flags=re.MULTILINE) | |
| # Remove existing pdf: field | |
| fm_text = re.sub(r"^pdf:.*\n?", "", fm_text, flags=re.MULTILINE) | |
| # Remove source_scan (replaced by pdf) | |
| fm_text = re.sub(r"^source_scan:.*\n?", "", fm_text, flags=re.MULTILINE) | |
| # Get scan filename | |
| m = re.search(r'source_scan:\s*"?([^"\n]+)"?', content[3:end]) | |
| scan_filename = m.group(1) if m else "" | |
| # Build new fields and prepend | |
| docs_lines = "\n".join(f" - \"[[{d}]]\"" for d in sorted(doc_stems)) | |
| header = f"pdf: \"[[{scan_filename}]]\"\n" if scan_filename else "" | |
| header += f"docs:\n{docs_lines}\n" | |
| # Ensure fm_text starts with newline | |
| fm_text = fm_text.lstrip("\n") | |
| fm_text = "\n" + header + fm_text | |
| with open(ocr_md_path, "w", encoding="utf-8") as f: | |
| f.write("---" + fm_text + "---" + body) | |
| CHINESE_DIGITS = {"〇": "0", "一": "1", "二": "2", "三": "3", "四": "4", | |
| "五": "5", "六": "6", "七": "7", "八": "8", "九": "9"} | |
| def _chinese_to_arabic(s: str) -> str: | |
| """Convert a string of Chinese digit characters to Arabic digits.""" | |
| return "".join(CHINESE_DIGITS.get(c, c) for c in s) | |
| def _guess_date_from_ocr(ocr_dir: str, scan_stem: str) -> str: | |
| """Use DeepSeek to extract the document date from OCR text (first 2 pages).""" | |
| import requests | |
| ocr_path = os.path.join(ocr_dir, f"{scan_stem}_ocr.md") | |
| if not os.path.exists(ocr_path): | |
| return "" | |
| with open(ocr_path, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| # Extract text from first two pages only | |
| pages = re.split(r"^\*\*\[?\[?.*?Page \d+.*?\]?\]?\*\*", content, flags=re.MULTILINE) | |
| # pages[0] is frontmatter/header, pages[1] is page 1 text, pages[2] is page 2 text | |
| text = "\n".join(pages[1:3]) if len(pages) > 1 else content[:3000] | |
| text = text[:4000] # cap length | |
| api_key = os.environ.get("OPENROUTER_API_KEY", "") | |
| if not api_key: | |
| return "" | |
| payload = { | |
| "model": DEEPSEEK_MODEL, | |
| "messages": [ | |
| { | |
| "role": "system", | |
| "content": ( | |
| "You are a date extraction assistant for Chinese archival documents. " | |
| "Given OCR text from an archival document, identify the date when " | |
| "this document was written or issued. Look for dates in the text — " | |
| "they may appear as 1978年4月29日, 一九七八年四月二十九日, 1978.4.29, " | |
| "1978,4,29, or similar formats. " | |
| "Respond with ONLY the date in YYYY-MM-DD format (e.g. 1978-04-29). " | |
| "If only year and month are found, respond YYYY-MM. " | |
| "If only year, respond YYYY. " | |
| "If no date can be determined, respond with just: NONE" | |
| ), | |
| }, | |
| {"role": "user", "content": text}, | |
| ], | |
| "max_tokens": 32, | |
| "temperature": 0, | |
| } | |
| headers = { | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json", | |
| } | |
| try: | |
| resp = requests.post(OPENROUTER_URL, json=payload, headers=headers, timeout=30) | |
| resp.raise_for_status() | |
| result = resp.json()["choices"][0]["message"]["content"].strip() | |
| # Validate format | |
| if re.match(r"^\d{4}(-\d{2}(-\d{2})?)?$", result): | |
| return result | |
| return "" | |
| except Exception as e: | |
| print(f" (date guess failed: {e})") | |
| return "" | |
| def _date_for_filename(date_str: str) -> str: | |
| """Pad a date string for use in a filename.""" | |
| if not date_str: | |
| return "0000-00-00" | |
| parts = date_str.split("-") | |
| if len(parts) == 1: | |
| return f"{parts[0]}-00-00" | |
| elif len(parts) == 2: | |
| return f"{parts[0]}-{parts[1]}-00" | |
| return date_str | |
| def _create_doc_stub(docs_dir: str, archive_name: str, scan_stem: str, | |
| scan_filename: str, ocr_md_name: str, | |
| ocr_dir: str = "") -> str: | |
| """Create a stub doc file for a scan that has no matching doc.""" | |
| # Strip archive name prefix from scan_stem if present (e.g. "BJMA 1-22-371-46" -> "1-22-371-46") | |
| loc = scan_stem | |
| prefix = archive_name + " " | |
| if loc.startswith(prefix): | |
| loc = loc[len(prefix):] | |
| # Try to guess date from OCR | |
| guessed_date = _guess_date_from_ocr(ocr_dir, scan_stem) if ocr_dir else "" | |
| date_for_fn = _date_for_filename(guessed_date) | |
| doc_filename = f"{date_for_fn}_{archive_name}_{loc}.md" | |
| doc_path = os.path.join(docs_dir, doc_filename) | |
| if os.path.exists(doc_path): | |
| return doc_filename | |
| content = "\n".join([ | |
| "---", | |
| "title:", | |
| f"doc_date: {guessed_date}", | |
| f"archive: {archive_name}", | |
| f"archive_loc: {loc}", | |
| "author:", | |
| f"pdf: \"[[{scan_filename}]]\"", | |
| f"ocr: \"[[{ocr_md_name}]]\"", | |
| "accessed:", | |
| "transcribe_status: not_started", | |
| "print_status:", | |
| "---", | |
| "", | |
| ]) | |
| with open(doc_path, "w", encoding="utf-8") as f: | |
| f.write(content) | |
| if guessed_date: | |
| print(f" (guessed date: {guessed_date})") | |
| return doc_filename | |
| def match_and_update_docs(docs_dir: str, scans_dir: str, ocr_dir: str, | |
| archive_name: str): | |
| """Match each doc to its scan(s) and update frontmatter in both directions.""" | |
| os.makedirs(docs_dir, exist_ok=True) | |
| scan_stems = set() | |
| scan_filenames = {} | |
| for f in os.listdir(scans_dir): | |
| if is_pdf_file(f) or is_image_file(f): | |
| stem = Path(f).stem | |
| scan_stems.add(stem) | |
| scan_filenames[stem] = f | |
| matched = 0 | |
| unmatched = [] | |
| matched_scans = set() | |
| # Build reverse map: scan_stem -> list of doc filenames (without .md) | |
| ocr_to_docs = {} | |
| for doc_file in sorted(os.listdir(docs_dir)): | |
| if not doc_file.endswith(".md"): | |
| continue | |
| doc_path = os.path.join(docs_dir, doc_file) | |
| fm = parse_doc_frontmatter(doc_path) | |
| archive_loc = fm.get("archive_loc", "") | |
| if not archive_loc: | |
| continue | |
| matching = [s for s in scan_stems if scan_matches_doc(s, archive_loc, archive_name)] | |
| if not matching: | |
| unmatched.append((doc_file, archive_loc)) | |
| continue | |
| best = max(matching, key=len) | |
| matched_scans.add(best) | |
| scan_filename = scan_filenames[best] | |
| ocr_md_name = f"{best}_ocr" | |
| updates = { | |
| "pdf": f"\"[[{scan_filename}]]\"", | |
| "ocr": f"\"[[{ocr_md_name}]]\"", | |
| } | |
| update_doc_frontmatter(doc_path, updates) | |
| matched += 1 | |
| print(f" {doc_file} -> {scan_filename}") | |
| # Track for OCR backlinks | |
| doc_stem = doc_file.removesuffix(".md") | |
| ocr_to_docs.setdefault(best, []).append(doc_stem) | |
| # Create stub docs for scans with no matching doc | |
| orphan_scans = scan_stems - matched_scans | |
| created = 0 | |
| for stem in sorted(orphan_scans): | |
| scan_filename = scan_filenames[stem] | |
| ocr_md_name = f"{stem}_ocr" | |
| doc_filename = _create_doc_stub(docs_dir, archive_name, stem, | |
| scan_filename, ocr_md_name, | |
| ocr_dir=ocr_dir) | |
| doc_stem = doc_filename.removesuffix(".md") | |
| ocr_to_docs.setdefault(stem, []).append(doc_stem) | |
| created += 1 | |
| print(f" Created stub: {doc_filename}") | |
| # Update OCR files with backlinks to docs | |
| if os.path.isdir(ocr_dir): | |
| for scan_stem, doc_stems in ocr_to_docs.items(): | |
| ocr_md_path = os.path.join(ocr_dir, f"{scan_stem}_ocr.md") | |
| if not os.path.exists(ocr_md_path): | |
| continue | |
| _update_ocr_docs_field(ocr_md_path, doc_stems) | |
| print(f" Linked {matched} docs to scans.") | |
| if created: | |
| print(f" Created {created} stub docs for unmatched scans.") | |
| if unmatched: | |
| print(f" {len(unmatched)} docs have no matching scan:") | |
| for doc_file, loc in unmatched: | |
| print(f" {doc_file} ({loc})") | |
| # --------------------------------------------------------------------------- | |
| # CLI | |
| # --------------------------------------------------------------------------- | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="OCR archival scans and link to document metadata.", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| engines: | |
| qwen Qwen3-VL-8B via OpenRouter (default, better quality, needs API key) | |
| tesseract Local tesseract OCR (fast, free) | |
| """, | |
| ) | |
| parser.add_argument("archive_folder", nargs="?", default=None, | |
| help="Archive folder name (e.g. SHMA) or path") | |
| parser.add_argument("--engine", choices=["tesseract", "qwen"], default="qwen", | |
| help="OCR engine (default: qwen)") | |
| parser.add_argument("--lang", default="chi_sim+eng", | |
| help="Tesseract language(s) (default: chi_sim+eng)") | |
| parser.add_argument("--dpi", type=int, default=150, | |
| help="DPI for PDF page extraction (default: 150)") | |
| parser.add_argument("--force", action="store_true", | |
| help="Re-OCR even if output already exists") | |
| parser.add_argument("--link-only", action="store_true", | |
| help="Skip OCR, only update doc metadata with scan links") | |
| args = parser.parse_args() | |
| # List available archives if no argument given | |
| if args.archive_folder is None: | |
| script_dir = os.path.dirname(os.path.abspath(__file__)) | |
| dirs = sorted([ | |
| d for d in os.listdir(script_dir) | |
| if os.path.isdir(os.path.join(script_dir, d)) | |
| and os.path.isdir(os.path.join(script_dir, d, "scans")) | |
| ]) | |
| print("Available archives (folders with scans/):\n") | |
| for d in dirs: | |
| scans = [f for f in os.listdir(os.path.join(script_dir, d, "scans")) | |
| if is_pdf_file(f) or is_image_file(f)] | |
| docs_dir = os.path.join(script_dir, d, "docs") | |
| docs = len([f for f in os.listdir(docs_dir) if f.endswith(".md")]) if os.path.isdir(docs_dir) else 0 | |
| print(f" {d:20s} {len(scans)} scans, {docs} docs") | |
| if not dirs: | |
| print(" (none found)") | |
| print(f"\nUsage: python {os.path.basename(__file__)} <archive_folder> [--engine qwen|tesseract] [--dpi N]") | |
| sys.exit(0) | |
| # Resolve archive folder | |
| archive_dir = args.archive_folder | |
| if not os.path.isabs(archive_dir): | |
| script_dir = os.path.dirname(os.path.abspath(__file__)) | |
| candidate = os.path.join(script_dir, archive_dir) | |
| if os.path.isdir(candidate): | |
| archive_dir = candidate | |
| else: | |
| print(f"ERROR: Cannot find archive folder: {archive_dir}") | |
| sys.exit(1) | |
| archive_name = os.path.basename(archive_dir) | |
| scans_dir = os.path.join(archive_dir, "scans") | |
| docs_dir = os.path.join(archive_dir, "docs") | |
| ocr_dir = os.path.join(archive_dir, "ocr") | |
| if not os.path.isdir(scans_dir): | |
| print(f"ERROR: No scans/ folder in {archive_dir}") | |
| sys.exit(1) | |
| scan_files = sorted([ | |
| f for f in os.listdir(scans_dir) | |
| if is_pdf_file(f) or is_image_file(f) | |
| ]) | |
| if not scan_files: | |
| print(f"No scan files found in {scans_dir}") | |
| sys.exit(1) | |
| # Validate engine requirements | |
| api_key = None | |
| if args.engine == "tesseract" and not TESSERACT: | |
| print("ERROR: tesseract not found. Install with: brew install tesseract") | |
| sys.exit(1) | |
| if args.engine == "qwen": | |
| api_key = get_openrouter_key() | |
| print(f"Archive: {archive_name}") | |
| print(f"Engine: {args.engine}" + (f" ({args.lang})" if args.engine == "tesseract" else f" ({OPENROUTER_MODEL})")) | |
| print(f"Scans: {len(scan_files)} files") | |
| print(f"DPI: {args.dpi}") | |
| print() | |
| # OCR | |
| if not args.link_only: | |
| os.makedirs(ocr_dir, exist_ok=True) | |
| for i, scan_file in enumerate(scan_files): | |
| scan_path = os.path.join(scans_dir, scan_file) | |
| print(f"[{i+1}/{len(scan_files)}] {scan_file}") | |
| ocr_scan(scan_path, ocr_dir, engine=args.engine, lang=args.lang, | |
| dpi=args.dpi, api_key=api_key) | |
| print() | |
| # Link docs <-> scans | |
| print("Linking docs to scans...") | |
| match_and_update_docs(docs_dir, scans_dir, ocr_dir, archive_name) | |
| print("\nDone.") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment