Skip to content

Instantly share code, notes, and snippets.

@CGamesPlay
Created December 14, 2024 00:58
Show Gist options
  • Save CGamesPlay/4ea6322d5e90b644e30612741a670428 to your computer and use it in GitHub Desktop.
Save CGamesPlay/4ea6322d5e90b644e30612741a670428 to your computer and use it in GitHub Desktop.
Migrate Obsidian (or markdown directory) to Anytype markdown import
#!/usr/bin/env python3
# /// script
# dependencies = [
# "mistune==3.0.2",
# ]
# ///
import os
import sys
import hashlib
import json
from collections import defaultdict
import mistune
from mistune.renderers.markdown import MarkdownRenderer
import mistune.renderers._list as mistune_list
from pathlib import Path
from urllib.parse import unquote
orig = mistune_list._render_list_item
def render_list_item(renderer, parent, item, state):
leading = parent["leading"]
checkbox = ""
text = ""
if item["type"] == "task_list_item":
if item["attrs"]["checked"]:
checkbox = "[x] "
else:
checkbox = "[ ] "
for tok in item["children"]:
if tok["type"] == "list":
tok["parent"] = parent
elif tok["type"] == "blank_line":
continue
text += renderer.render_token(tok, state)
lines = text.splitlines()
text = (lines[0] if lines else "") + "\n"
prefix = " " * len(leading)
for line in lines[1:]:
if line:
text += prefix + line + "\n"
else:
text += "\n"
return leading + checkbox + text
mistune_list._render_list_item = render_list_item
def calculate_file_hash(filepath):
"""Calculate SHA-256 hash of a file."""
sha256_hash = hashlib.sha256()
with open(filepath, "rb") as f:
# Read the file in chunks to handle large files efficiently
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
def scan_directory(directory):
"""Recursively scan directory and create a map of filenames to hashes."""
file_map = {}
directory_path = Path(directory)
if not directory_path.exists():
print(f"Error: Directory '{directory}' does not exist")
sys.exit(1)
for filepath in directory_path.rglob("*"):
if filepath.stem.startswith("."):
continue
if any(p.stem.startswith(".") for p in filepath.parents):
continue
if filepath.is_file():
try:
relative_path = str(filepath.relative_to(directory_path))
file_hash = calculate_file_hash(filepath)
if (
file_hash
== "d0a69367f54ae3c9687a8deb632c85d2dd6e617a273b38f3d5725b81ac9c13ca"
):
# The file hash of the default, unmodified directory index file
continue
file_map[relative_path] = f"{file_hash}{filepath.suffix}"
except Exception as e:
print(f"Error processing {filepath}: {e}")
return file_map
def process_link(node, file_map, file_path, is_image=False):
"""Convert a relative link to the hashed URL, and potentially split into text + link for AnyType."""
url = unquote(node["attrs"]["url"])
is_external = url.startswith(
(
"http://",
"https://",
"#",
"mailplane://",
"things://",
"mailto:",
"tel:",
)
)
need_rewrite = not is_image and (
is_external
or len(node["children"]) == 1
and node["children"][0].get("raw") != Path(url).stem
)
if not is_external:
# Convert relative path to absolute
try:
abs_path = os.path.normpath(str(file_path.parent / url))
# Replace with hash link
node["attrs"]["url"] = f"{file_map[abs_path]}"
except (ValueError, KeyError, RuntimeError):
# Keep original if path resolution fails
print(f"WARN: {file_path}: broken link to {url}")
if need_rewrite:
text = node["children"]
node["children"] = [{"type": "text", "raw": "link"}]
return [
*text,
{"type": "text", "raw": " ("},
node,
{"type": "text", "raw": ")"},
]
else:
return [node]
def process_ast(node, file_map, file_path, top=False):
"""Recursively process AST nodes and convert file links to hash links."""
if isinstance(node, list):
if top and (
len(node) == 0
or node[0].get("type") != "heading"
or node[0].get("attrs", {}).get("level") != 1
):
node.insert(
0,
{
"type": "heading",
"attrs": {"level": 1},
"children": [{"type": "text", "raw": file_path.stem}],
},
)
return [process_ast(child, file_map, file_path) for child in node]
if isinstance(node, dict):
if node.get("type") == "image":
process_link(node, file_map, file_path, is_image=True)
if isinstance(node.get("children"), list):
# For any type == link child, call process link and replace the node with the returned array.
node["children"] = [
child
for sublist in [
process_link(child, file_map, file_path)
if child.get("type") == "link"
else [child]
for child in node["children"]
]
for child in sublist
]
return {k: process_ast(v, file_map, file_path) for k, v in node.items()}
return node
def preserve_timestamps(source_dir, target_dir, file_map):
"""Copy original file timestamps to the processed files."""
source_path = Path(source_dir)
target_path = Path(target_dir)
for orig_path, hash_name in file_map.items():
source_file = source_path / orig_path
target_file = target_path / hash_name
if source_file.exists() and target_file.exists():
# Get original timestamps
stat = source_file.stat()
# Set access and modification times on the new file
os.utime(target_file, (stat.st_atime, stat.st_mtime))
def main():
if len(sys.argv) != 3:
print("Usage: python import.py <source_directory> <target_directory>")
sys.exit(1)
source_dir = sys.argv[1]
target_dir = sys.argv[2]
# Create hash map of files
file_map = scan_directory(source_dir)
# Process markdown files and save their ASTs
markdown_ast = mistune.create_markdown(renderer=None, plugins=["task_lists"])
ast_markdown = MarkdownRenderer()
source_path = Path(source_dir)
target_path = Path(target_dir)
target_path.mkdir(parents=True, exist_ok=True)
for filepath, file_hash in file_map.items():
if filepath.lower().endswith(".md"):
full_path = source_path / filepath
with open(full_path, "r", encoding="utf-8") as f:
content = f.read()
results, state = markdown_ast.parse(content)
# Process links in the AST
processed_results = process_ast(results, file_map, Path(filepath), top=True)
rendered = ast_markdown(processed_results, state)
ast_markdown.verbose = False
# Save processed markdown to target directory
ast_path = target_path / f"{file_hash}"
ast_path.write_text(rendered, encoding="utf-8")
else:
# For non-markdown files, copy directly to target with hash filename
source_file = source_path / filepath
target_file = target_path / f"{file_hash}"
target_file.write_bytes(source_file.read_bytes())
# Create directory listings
directories = defaultdict(list)
# First collect all directories
all_dirs = set()
for filepath in file_map:
path = Path(filepath)
current = path.parent
while current != Path("."):
all_dirs.add(str(current))
current = current.parent
# Then collect files for each directory
for filepath, hash_name in file_map.items():
path = Path(filepath)
if path.parent != Path("."):
directories[str(path.parent)].append((path.name, hash_name))
# Sort directories by depth (deepest first)
sorted_dirs = sorted(all_dirs, key=lambda x: len(Path(x).parts), reverse=True)
# Create and save directory listing files
for directory in sorted_dirs:
content = [f"# {Path(directory).stem}\n\n"]
files = directories[directory]
# Add all files in this directory
for filename, hash_name in sorted(files):
content.append(f"- [{filename}]({hash_name})\n")
# Calculate hash of the listing content
listing_content = "".join(content)
listing_hash = hashlib.sha256(listing_content.encode()).hexdigest()
# Save the listing file
listing_path = target_path / f"{listing_hash}.md"
listing_path.write_text(listing_content, encoding="utf-8")
# Add the listing to the file map and directories list
index_path = f"{directory}/_index.md"
file_map[index_path] = f"{listing_hash}.md"
# Add this index to parent directory's file list
parent = str(Path(directory).parent)
if parent != ".":
directories[parent].append(("_index.md", f"{listing_hash}.md"))
# Create master directory listing
master_content = ["# Directory Index\n\n"]
for directory in sorted(all_dirs):
index_path = f"{directory}/_index.md"
if index_path in file_map:
master_content.append(f"- [{directory}]({file_map[index_path]})\n")
master_listing = "".join(master_content)
master_hash = hashlib.sha256(master_listing.encode()).hexdigest()
master_path = target_path / f"{master_hash}.md"
master_path.write_text(master_listing, encoding="utf-8")
file_map["_index.md"] = f"{master_hash}.md"
# Create archived docs listing
master_content = ["# Everything Archived\n\n"]
for filepath, hash_name in file_map.items():
if Path(filepath).parts[0] == "4 Archive":
master_content.append(f" - [{filepath}]({hash_name})\n")
master_listing = "".join(master_content)
master_hash = hashlib.sha256(master_listing.encode()).hexdigest()
master_path = target_path / f"{master_hash}.md"
master_path.write_text(master_listing, encoding="utf-8")
file_map["_archive.md"] = f"{master_hash}.md"
# Save the map to a JSON file in the target directory
hash_file = target_path / "file_hashes.json"
with open(hash_file, "w") as f:
json.dump(file_map, f, indent=2)
print(f"Processed {len(file_map)} files")
print(f"Hash map saved to {hash_file}")
# Preserve original timestamps
preserve_timestamps(source_dir, target_dir, file_map)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment