|
#!/usr/bin/env python3 |
|
import json |
|
import sys |
|
import subprocess |
|
import tempfile |
|
import shutil |
|
from pathlib import Path |
|
from typing import Union, List |
|
import logging |
|
|
|
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') |
|
logger = logging.getLogger(__name__) |
|
|
|
def clear_notebook_outputs(notebook_path: Path) -> None: |
|
with notebook_path.open('r', encoding='utf-8') as f: |
|
notebook = json.load(f) |
|
|
|
for cell in notebook.get('cells', []): |
|
if cell.get('cell_type') == 'code': |
|
cell['outputs'] = [] |
|
cell['execution_count'] = None |
|
|
|
with notebook_path.open('w', encoding='utf-8') as f: |
|
json.dump(notebook, f, indent=2, ensure_ascii=False) |
|
|
|
def convert_notebook_to_python(notebook_path: Union[str, Path], output_path: Union[str, Path, None] = None) -> Path: |
|
notebook_path = Path(notebook_path).resolve() |
|
if not notebook_path.exists(): |
|
raise FileNotFoundError(f"Notebook not found: {notebook_path}") |
|
|
|
output_path = Path(output_path) if output_path else notebook_path.with_suffix('.py') |
|
|
|
with notebook_path.open('r', encoding='utf-8') as f: |
|
try: |
|
notebook = json.load(f) |
|
except json.JSONDecodeError as e: |
|
raise json.JSONDecodeError(f"Invalid notebook format in {notebook_path}: {str(e)}", e.doc, e.pos) |
|
|
|
python_code = [] |
|
|
|
for cell in notebook.get('cells', []): |
|
if cell['cell_type'] == 'code': |
|
source = ''.join(cell.get('source', [])) |
|
if source.strip(): |
|
python_code.append(source) |
|
python_code.append('\n') |
|
elif cell['cell_type'] == 'markdown': |
|
source = ''.join(cell.get('source', [])) |
|
if source.strip(): |
|
commented_lines = [f'# {line}' if line.strip() else '#' |
|
for line in source.splitlines()] |
|
python_code.extend(commented_lines) |
|
python_code.append('\n') |
|
|
|
output_path.parent.mkdir(parents=True, exist_ok=True) |
|
output_path.write_text('\n'.join(python_code), encoding='utf-8') |
|
return output_path |
|
|
|
def process_directory(directory_path: Union[str, Path]) -> List[Path]: |
|
directory_path = Path(directory_path).resolve() |
|
if not directory_path.is_dir(): |
|
raise NotADirectoryError(f"Not a directory: {directory_path}") |
|
|
|
converted_files = [] |
|
for notebook_path in directory_path.glob('**/*.ipynb'): |
|
if '.ipynb_checkpoints' in notebook_path.parts: |
|
continue |
|
try: |
|
clear_notebook_outputs(notebook_path) |
|
output_path = convert_notebook_to_python(notebook_path) |
|
converted_files.append(output_path) |
|
logger.info(f"Converted: {notebook_path} -> {output_path}") |
|
except Exception as e: |
|
logger.error(f"Failed to convert {notebook_path}: {e}") |
|
|
|
return converted_files |
|
|
|
def create_python_only_copy(source_dir: Path, temp_dir: Path) -> None: |
|
for item in source_dir.rglob('*'): |
|
if item.is_file() and item.suffix == '.py': |
|
relative_path = item.relative_to(source_dir) |
|
dest_path = temp_dir / relative_path |
|
dest_path.parent.mkdir(parents=True, exist_ok=True) |
|
shutil.copy2(item, dest_path) |
|
|
|
def run_gitingest(directory_path: Path) -> str: |
|
try: |
|
result = subprocess.run([ |
|
'python', '-m', 'gitingest', str(directory_path) |
|
], capture_output=True, text=True, check=True) |
|
return result.stdout |
|
except subprocess.CalledProcessError as e: |
|
try: |
|
result = subprocess.run([ |
|
'gitingest', str(directory_path) |
|
], capture_output=True, text=True, check=True) |
|
return result.stdout |
|
except subprocess.CalledProcessError as e2: |
|
logger.error(f"gitingest failed: {e2.stderr}") |
|
raise |
|
except FileNotFoundError: |
|
logger.error("gitingest not found. Please install it first: pip install git+https://github.com/cyclotruc/gitingest.git") |
|
raise |
|
|
|
def cleanup_converted_files(directory_path: Path) -> None: |
|
for py_file in directory_path.glob('**/*.py'): |
|
corresponding_ipynb = py_file.with_suffix('.ipynb') |
|
if corresponding_ipynb.exists(): |
|
py_file.unlink() |
|
logger.info(f"Deleted: {py_file}") |
|
|
|
def main(): |
|
if len(sys.argv) == 1: |
|
target = Path.cwd() |
|
elif len(sys.argv) == 2: |
|
target = Path(sys.argv[1]) |
|
else: |
|
logger.error("Usage: nbingest [<directory>]") |
|
sys.exit(1) |
|
|
|
if not target.is_dir(): |
|
logger.error(f"Target must be a directory: {target}") |
|
sys.exit(1) |
|
|
|
try: |
|
logger.info("Converting notebooks to Python files...") |
|
converted_files = process_directory(target) |
|
|
|
with tempfile.TemporaryDirectory() as temp_dir: |
|
temp_path = Path(temp_dir) |
|
logger.info("Creating Python-only copy...") |
|
create_python_only_copy(target, temp_path) |
|
|
|
logger.info("Running gitingest on Python files...") |
|
output = run_gitingest(temp_path) |
|
print(output) |
|
|
|
logger.info("Cleaning up converted files...") |
|
cleanup_converted_files(target) |
|
|
|
except Exception as e: |
|
logger.error(f"Error: {e}") |
|
sys.exit(1) |
|
|
|
if __name__ == '__main__': |
|
main() |