Last active
March 27, 2026 01:30
-
-
Save jasonrdsouza/456fcf9394615d9f72bc1792b6417a2d to your computer and use it in GitHub Desktop.
Script to quickly search through excel files from the command line
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """Search Excel files for rows containing a given value.""" | |
| import argparse | |
| import re | |
| import sys | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from pathlib import Path | |
| import polars as pl | |
| def normalize(s: str) -> str: | |
| """Normalize a string for fuzzy matching: lowercase, strip commas, collapse whitespace.""" | |
| s = s.lower() | |
| s = s.replace(",", "") | |
| s = re.sub(r"\s+", " ", s).strip() | |
| return s | |
| def search_file( | |
| filepath: Path, search_value: str, column_filter: str | None, fuzzy: bool = False | |
| ) -> list[tuple[str, int, list[tuple[str, str, bool]]]]: | |
| """Search a single Excel file, returning matched rows. | |
| Returns a list of (filename, row_number, [(col, val, is_match), ...]) tuples. | |
| """ | |
| try: | |
| df = pl.read_excel(filepath, engine="calamine", infer_schema_length=0) | |
| except Exception as e: | |
| print(f"[WARN] Could not open {filepath.name}: {e}", file=sys.stderr) | |
| return [] | |
| # Determine which columns to search | |
| if column_filter: | |
| search_cols = [c for c in df.columns if column_filter in c.lower()] | |
| if not search_cols: | |
| return [] | |
| else: | |
| search_cols = df.columns | |
| # Build a single expression: cast each search column to string, lowercase, | |
| # check for substring match, then OR them all together. | |
| if fuzzy: | |
| match_exprs = [ | |
| pl.col(c).cast(pl.Utf8) | |
| .str.to_lowercase() | |
| .str.replace_all(",", "") | |
| .str.replace_all(r"\s+", " ") | |
| .str.strip_chars() | |
| .str.contains(search_value, literal=True) | |
| for c in search_cols | |
| ] | |
| else: | |
| match_exprs = [ | |
| pl.col(c).cast(pl.Utf8).str.to_lowercase().str.contains(search_value, literal=True) | |
| for c in search_cols | |
| ] | |
| mask = match_exprs[0] | |
| for expr in match_exprs[1:]: | |
| mask = mask | expr | |
| df = df.with_row_index("__orig_idx") | |
| matched = df.filter(mask) | |
| results: list[tuple[str, int, list[tuple[str, str, bool]]]] = [] | |
| for row in matched.iter_rows(named=True): | |
| orig_idx = row["__orig_idx"] | |
| row_num = orig_idx + 2 # +2 for 1-based + header | |
| fields: list[tuple[str, str, bool]] = [] | |
| for col in df.columns: | |
| if col == "__orig_idx": | |
| continue | |
| val = row[col] | |
| if val is None: | |
| continue | |
| s = str(val).strip() | |
| if not s: | |
| continue | |
| is_match = search_value in (normalize(s) if fuzzy else s.lower()) | |
| fields.append((col, str(val), is_match)) | |
| results.append((filepath.name, row_num, fields)) | |
| return results | |
| def main() -> None: | |
| parser = argparse.ArgumentParser( | |
| description="Search all .xlsx files in a directory for rows containing a given value." | |
| ) | |
| parser.add_argument("search_value", help="Value to search for (case-insensitive substring match)") | |
| parser.add_argument("--column", help="Restrict search to columns whose header contains this string (case-insensitive)") | |
| parser.add_argument("--dir", type=Path, default=Path.cwd(), help="Directory to search (default: current working directory)") | |
| parser.add_argument("--fuzzy", action="store_true", help="Fuzzy match: ignore commas and extra whitespace") | |
| args = parser.parse_args() | |
| search_value = normalize(args.search_value) if args.fuzzy else args.search_value.lower() | |
| column_filter = args.column.lower() if args.column else None | |
| xlsx_dir = args.dir.resolve() | |
| if not xlsx_dir.is_dir(): | |
| print(f"Error: {xlsx_dir} is not a directory.", file=sys.stderr) | |
| sys.exit(1) | |
| files = sorted(xlsx_dir.glob("*.xlsx")) | |
| if not files: | |
| print("No .xlsx files found.") | |
| sys.exit(1) | |
| # Process files in parallel — I/O-bound so threads work well. | |
| # Collect results keyed by filename to preserve sorted output order. | |
| results_by_file: dict[str, list[tuple[str, int, list[tuple[str, str, bool]]]]] = {} | |
| with ThreadPoolExecutor() as pool: | |
| futures = { | |
| pool.submit(search_file, fp, search_value, column_filter, args.fuzzy): fp for fp in files | |
| } | |
| for future in as_completed(futures): | |
| file_results = future.result() | |
| if file_results: | |
| results_by_file[futures[future].name] = file_results | |
| # Print in original sorted file order | |
| total_matches = 0 | |
| for filepath in files: | |
| for filename, row_num, fields in results_by_file.get(filepath.name, []): | |
| total_matches += 1 | |
| print(f"\n{'=' * 80}") | |
| print(f"File: {filename} | Row: {row_num}") | |
| print(f"{'=' * 80}") | |
| for col, val, is_match in fields: | |
| display = f">>> {val} <<<" if is_match else val | |
| print(f" {col}: {display}") | |
| print(f"\n--- {total_matches} matching row(s) found across {len(files)} file(s) ---") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment