Created
May 28, 2025 01:04
-
-
Save wesleyel/10d9d1ebccd8093f5f3b5d94162fe3ea to your computer and use it in GitHub Desktop.
Embed image resource in markdown
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env -S uv --quiet run --script | |
# /// script | |
# requires-python = ">=3.13" | |
# dependencies = [ | |
# "markdown-it-py", | |
# "requests", | |
# "tqdm", | |
# ] | |
# /// | |
import os | |
import base64 | |
import mimetypes | |
import concurrent.futures | |
from pathlib import Path | |
from urllib.parse import urlparse | |
import re # Make sure re is imported | |
import requests | |
from markdown_it import MarkdownIt | |
# from markdown_it.renderer import RendererHTML # Not used in final regex approach | |
# from markdown_it.utils import OptionsDict # Not used | |
from tqdm import tqdm | |
# Configuration | |
MAX_WORKERS = 3 | |
OUTPUT_DIR = Path("output") | |
REQUEST_TIMEOUT = 10 # seconds for network requests | |
VERBOSE = False # Set to True for detailed per-image logging, False for cleaner output | |
# Ensure mimetypes are initialized | |
mimetypes.init() | |
def get_image_mimetype(image_path_or_url, content=None): | |
"""Determines the MIME type of an image.""" | |
parsed_url = urlparse(image_path_or_url) | |
if parsed_url.scheme in ['http', 'https']: | |
if content: | |
pass # Placeholder for more advanced content-based MIME detection | |
mime_type, _ = mimetypes.guess_type(parsed_url.path) | |
if mime_type: | |
return mime_type | |
else: # Local file | |
mime_type, _ = mimetypes.guess_type(image_path_or_url) | |
if mime_type: | |
return mime_type | |
# Default or if unable to determine | |
ext = Path(image_path_or_url).suffix.lower() | |
if ext == ".png": return "image/png" | |
if ext in [".jpg", ".jpeg"]: return "image/jpeg" | |
if ext == ".gif": return "image/gif" | |
if ext == ".svg": return "image/svg+xml" | |
if ext == ".webp": return "image/webp" | |
return "application/octet-stream" | |
def image_to_base64(image_src, md_file_path: Path): | |
""" | |
Converts an image (local or remote) to a base64 data URI. | |
Returns (data_uri_string_or_None, status_message_or_None) | |
""" | |
log_messages = [] | |
try: | |
parsed_url = urlparse(image_src) | |
image_data = None | |
content_type_header = None | |
if parsed_url.scheme in ['http', 'https']: | |
if VERBOSE: log_messages.append(f" Fetching remote: {image_src[:70]}...") | |
response = requests.get(image_src, timeout=REQUEST_TIMEOUT, stream=True) | |
response.raise_for_status() | |
image_data = response.content | |
content_type_header = response.headers.get('Content-Type') | |
elif not parsed_url.scheme and not parsed_url.netloc: | |
base_dir = md_file_path.parent | |
local_image_path = (base_dir / image_src).resolve() | |
if local_image_path.is_file(): | |
if VERBOSE: log_messages.append(f" Reading local: {local_image_path.name}") | |
with open(local_image_path, "rb") as f: | |
image_data = f.read() | |
else: | |
return None, f" ERROR: Local image not found: {local_image_path} (referenced in {md_file_path.name})" | |
else: | |
return None, f" WARNING: Unsupported image scheme: {image_src} (in {md_file_path.name})" | |
if image_data: | |
base64_encoded_data = base64.b64encode(image_data).decode('utf-8') | |
mime_type = content_type_header or get_image_mimetype(image_src, image_data) | |
if VERBOSE: log_messages.append(f" Encoded {image_src[:50]}... as {mime_type}") | |
return f"data:{mime_type};base64,{base64_encoded_data}", "\n".join(log_messages) if log_messages else None | |
except requests.exceptions.RequestException as e: | |
return None, f" ERROR fetching {image_src[:70]}...: {e} (in {md_file_path.name})" | |
except IOError as e: | |
return None, f" ERROR reading {image_src}: {e} (in {md_file_path.name})" | |
except Exception as e: | |
return None, f" ERROR processing {image_src}: {e} (in {md_file_path.name})" | |
# Fallback if something unexpected happened before returning | |
final_message = "\n".join(log_messages) if log_messages else None | |
if not image_data and not final_message: # Ensure there's a message if we fall through without success | |
final_message = f" ERROR: Unknown issue processing image {image_src} (in {md_file_path.name})" | |
return None, final_message | |
def process_markdown_file(md_file_path: Path, output_base_dir: Path): | |
""" | |
Reads a Markdown file, embeds images, and saves it to the output directory. | |
Returns (success_boolean, list_of_human_readable_messages) | |
""" | |
file_operation_messages = [] | |
if VERBOSE: | |
file_operation_messages.append(f"Starting processing: {md_file_path}") | |
try: | |
relative_path = md_file_path.relative_to(Path.cwd()) | |
output_file_path = output_base_dir / relative_path | |
output_file_path.parent.mkdir(parents=True, exist_ok=True) | |
with open(md_file_path, "r", encoding="utf-8") as f: | |
content = f.read() | |
image_pattern = re.compile(r"!\[(.*?)\]\((.*?)(?: \"(.*?)\")?\)") | |
processed_image_links_count = 0 | |
successfully_embedded_count = 0 | |
def replacer(match): | |
nonlocal processed_image_links_count, successfully_embedded_count # Allow modification | |
alt_text = match.group(1) | |
original_src = match.group(2) | |
title_text = match.group(3) if match.group(3) else "" | |
if original_src.startswith("data:"): # Already embedded | |
return match.group(0) | |
processed_image_links_count += 1 | |
if VERBOSE: | |
file_operation_messages.append(f" Found image link: {original_src[:70]}... in {md_file_path.name}") | |
base64_uri, image_status_msg = image_to_base64(original_src, md_file_path) | |
if image_status_msg: # Add any messages from image_to_base64 | |
# Only add detailed image status if VERBOSE, or if it's an error/warning | |
if VERBOSE or "ERROR" in image_status_msg.upper() or "WARNING" in image_status_msg.upper(): | |
file_operation_messages.append(image_status_msg) | |
if base64_uri: | |
successfully_embedded_count += 1 | |
new_tag = f"" | |
if VERBOSE: | |
file_operation_messages.append(f" Embedded: {original_src[:50]}...") | |
return new_tag | |
else: | |
# If embedding failed and no specific error was added from image_to_base64, add a generic one | |
if not any(msg for msg in file_operation_messages if original_src in msg and ("ERROR" in msg.upper() or "WARNING" in msg.upper())): | |
file_operation_messages.append(f" WARNING: Failed to embed {original_src[:70]}... (in {md_file_path.name}). Kept original.") | |
return match.group(0) # Return original match if embedding fails | |
modified_content = image_pattern.sub(replacer, content) | |
with open(output_file_path, "w", encoding="utf-8") as f: | |
f.write(modified_content) | |
# Summary for this file | |
summary_msg = f"Finished: {md_file_path.name}." | |
if processed_image_links_count > 0: | |
summary_msg += f" (Found: {processed_image_links_count}, Embedded: {successfully_embedded_count}" | |
failures = processed_image_links_count - successfully_embedded_count | |
if failures > 0: | |
summary_msg += f", Failed/Skipped: {failures}" | |
summary_msg += ")" | |
elif VERBOSE: # Only if verbose and no images were found | |
summary_msg += " (No image links applicable for embedding)." | |
file_operation_messages.append(summary_msg) | |
return True, file_operation_messages | |
except Exception as e: | |
err_msg = f" Major ERROR processing file {md_file_path.name}: {e}" | |
if VERBOSE: | |
import traceback | |
err_msg += "\n" + traceback.format_exc() | |
file_operation_messages.append(err_msg) | |
return False, file_operation_messages | |
def main(): | |
""" | |
Main function to find Markdown files and process them. | |
""" | |
OUTPUT_DIR.mkdir(parents=True, exist_ok=True) | |
current_dir = Path.cwd() | |
md_files = [ | |
p for p in current_dir.rglob("*.md") | |
if not str(p.resolve()).startswith(str(OUTPUT_DIR.resolve())) | |
] | |
if not md_files: | |
print("No Markdown files found in the current directory (excluding 'output' directory).") | |
return | |
print(f"Found {len(md_files)} Markdown files to process. VERBOSE output is {'ON' if VERBOSE else 'OFF'}.") | |
successful_count = 0 | |
failed_count = 0 | |
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: | |
future_to_file = { | |
executor.submit(process_markdown_file, md_file, OUTPUT_DIR): md_file | |
for md_file in md_files | |
} | |
for future in tqdm(concurrent.futures.as_completed(future_to_file), total=len(md_files), desc="Embedding Images"): | |
md_file = future_to_file[future] | |
try: | |
success, messages = future.result() | |
if messages: | |
for msg in messages: | |
# Use tqdm.write to print messages without breaking the progress bar | |
# Add a file context to messages if not already clear | |
if md_file.name not in msg and VERBOSE: # Check helps avoid redundant file name prints | |
tqdm.write(f"[{md_file.name}] {msg}") | |
else: | |
tqdm.write(msg) | |
if success: | |
successful_count +=1 | |
else: | |
failed_count += 1 | |
# The error message should already be in 'messages' and printed by tqdm.write | |
except Exception as exc: # Catches exceptions from the worker task logic itself, if not handled by process_markdown_file | |
tqdm.write(f" Critical unhandled exception for {md_file.name}: {exc}") | |
failed_count += 1 | |
if VERBOSE: | |
import traceback | |
tqdm.write(traceback.format_exc()) | |
print("\n--- Summary ---") | |
print(f"Successfully processed files: {successful_count}") | |
print(f"Files with errors/failures: {failed_count}") | |
print(f"Output files are in: {OUTPUT_DIR.resolve()}") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment