|
#! /usr/bin/env python |
|
""" |
|
Modifies the body content of Quarto-flavored markdown (.qmd) files using ChatGPT (via Azure OpenAI), leaving YAML metadata unchanged. |
|
Annotates YAML metadata with 'llmupdated' date to prevent unnecessary processing. |
|
Includes a report of total input and output tokens used. |
|
""" |
|
|
|
import sys |
|
from pathlib import Path |
|
from dotenv import load_dotenv |
|
import os |
|
import tiktoken |
|
from openai import AsyncAzureOpenAI |
|
from openai import APIError |
|
import logging |
|
import asyncio |
|
import re |
|
import random |
|
from ruamel.yaml import YAML |
|
from datetime import datetime, timezone |
|
|
|
# Define the maximum number of concurrent API calls |
|
CONCURRENCY_LIMIT = 5 # Adjust based on your API's rate limits |
|
|
|
# Set up logging |
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
# Load environment variables from .env if available |
|
load_dotenv() |
|
|
|
# Configuration variables |
|
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY") |
|
AZURE_API_ENDPOINT = os.getenv( |
|
"AZURE_OPENAI_ENDPOINT", "https://your_openai_endpoint.azure.com/" |
|
) |
|
AZURE_API_VERSION = os.getenv("AZURE_API_VERSION", "2024-02-15-preview") |
|
AZURE_DEPLOYMENT_NAME = os.getenv("AZURE_DEPLOYMENT_NAME", "my-deployment") |
|
|
|
|
|
# Initialize the Azure OpenAI client |
|
client = AsyncAzureOpenAI( |
|
api_key=AZURE_OPENAI_API_KEY, |
|
api_version=AZURE_API_VERSION, |
|
azure_endpoint=AZURE_API_ENDPOINT, |
|
) |
|
|
|
# Initialize token encoder |
|
encoder = tiktoken.get_encoding("cl100k_base") # Use a known encoding which is probably inaccurate but seems more reliable |
|
|
|
# Initialize YAML parser |
|
yaml = YAML() |
|
yaml.preserve_quotes = True # Preserve quotes in YAML |
|
|
|
# Define model limits |
|
INPUT_TOKEN_LIMIT = 128000 # Input window size |
|
OUTPUT_TOKEN_LIMIT = 4096 # Output window size |
|
|
|
# Define the prompt template |
|
PROMPT_TEMPLATE = """ |
|
I will provide you with a blog post written in an informal, chatty style. Please review the text with the following guidelines in mind: |
|
|
|
- Correct any spelling or grammar mistakes, where necessary. |
|
- Fix passive voice where appropriate, making sentences more direct. |
|
- Simplify convoluted sentences while maintaining the original tone and flow, if possible. |
|
- Clarify ambiguous pronoun references, if possible. |
|
- Markdown links `[name](url)` should be preserved as is, as should `:::`-delimited markdown blocks and citation references `@citation`. |
|
- **Preserve the original formatting exactly, including all spaces and line breaks.** |
|
- Keep all informal and chatty language intact—do not overly formalize the tone. |
|
- If a problem cannot be resolved easily, leave it as-is but mark the sentence with "🚧TODO🚧 clarify". |
|
- Lines beginning with `>` are quotes from other people and should not be changed. |
|
- Be careful not to leave unmatched math delimiters `$`. |
|
- Change dumb punctuation to smart punctuation, e.g., "We're happy --- " to “We’re happy —” EXCEPT inside markdown link URLs. |
|
- [YOUR REQUIREMENTS HERE] |
|
|
|
**Please provide only the edited text. Do not include any explanations or additional comments.** |
|
""" |
|
|
|
# Initialize global token counters |
|
total_input_tokens = 0 # Total prompt tokens |
|
total_output_tokens = 0 # Total completion tokens |
|
token_lock = asyncio.Lock() # Lock for safe access to token counters |
|
|
|
|
|
def read(fname): |
|
""" |
|
Reads a Quarto (.qmd) file, returning the YAML metadata and body content separately. |
|
""" |
|
with open(fname, "r", encoding="utf8") as fp: |
|
content = fp.read() |
|
|
|
# Separate YAML metadata from the body content |
|
if content.startswith("---\n"): |
|
parts = content.split("---\n", 2) |
|
if len(parts) >= 3: |
|
yaml_str = parts[1] |
|
body_content = parts[2] |
|
yaml_metadata = yaml.load(yaml_str) |
|
return yaml_metadata, body_content |
|
|
|
return ( |
|
{}, |
|
content, |
|
) # No YAML block, return empty metadata and entire content as body |
|
|
|
|
|
def write(fname, yaml_metadata, content): |
|
""" |
|
Writes the YAML metadata and the body content back to the Quarto (.qmd) file. |
|
Updates the 'llmupdated' field in the YAML metadata. |
|
""" |
|
# Update 'llmupdated' to current date with timezone info, no microseconds |
|
yaml_metadata["llmupdated"] = ( |
|
datetime.now().astimezone().replace(microsecond=0).isoformat() |
|
) |
|
|
|
with open(fname, "w", encoding="utf8") as fp: |
|
if yaml_metadata: |
|
fp.write("---\n") |
|
yaml.dump(yaml_metadata, fp) |
|
fp.write("---\n") |
|
fp.write(content) |
|
|
|
|
|
def num_tokens_from_messages(messages, model="gpt-4"): |
|
encoding = tiktoken.encoding_for_model(model) |
|
num_tokens = 0 |
|
for message in messages: |
|
num_tokens += 4 # Every message starts with <im_start>{role}\n |
|
for key, value in message.items(): |
|
num_tokens += len(encoding.encode(value)) |
|
num_tokens += 2 # Every message ends with <im_end>\n |
|
num_tokens += 2 # Additional tokens for assistant's reply |
|
return num_tokens |
|
|
|
|
|
def chunk_text_by_paragraph(content, max_tokens): |
|
""" |
|
Splits the content into chunks based on paragraphs while ensuring each chunk does not exceed max_tokens. |
|
""" |
|
# Use regex to split content into paragraphs and include the separators (newlines) |
|
parts = re.split(r"(\n+)", content, flags=re.DOTALL) |
|
chunks = [] |
|
current_chunk = "" |
|
current_tokens = 0 |
|
|
|
for part in parts: |
|
part_tokens = len(encoder.encode(part)) |
|
if current_tokens + part_tokens > max_tokens and current_chunk: |
|
chunks.append(current_chunk) |
|
current_chunk = part |
|
current_tokens = part_tokens |
|
else: |
|
current_chunk += part |
|
current_tokens += part_tokens |
|
|
|
if current_chunk: |
|
chunks.append(current_chunk) |
|
|
|
return chunks |
|
|
|
|
|
async def call_chatgpt(content, semaphore, fname): |
|
""" |
|
Sends a chunk of content to the OpenAI API for editing. |
|
Returns the modified content, finish_reason, and error flag. |
|
""" |
|
global total_input_tokens, total_output_tokens |
|
|
|
messages = [ |
|
{ |
|
"role": "system", |
|
"content": ( |
|
"You are an assistant that edits text. " |
|
"Provide only the edited text and nothing else. " |
|
"**Preserve the original formatting exactly, including all spaces and line breaks and markdown formatting.** " |
|
"You use British spelling with the -ize variant, e.g. organize." |
|
), |
|
}, |
|
{"role": "user", "content": PROMPT_TEMPLATE + f"\n\nContent:\n{content}"}, |
|
] |
|
|
|
max_output_tokens = 4000 # Output token limit |
|
|
|
async with semaphore: |
|
try: |
|
response = await client.chat.completions.create( |
|
model=AZURE_DEPLOYMENT_NAME, |
|
messages=messages, |
|
max_tokens=max_output_tokens, |
|
temperature=0.2, |
|
) |
|
assistant_message = response.choices[0].message.content |
|
finish_reason = response.choices[0].finish_reason |
|
# Log usage data |
|
usage = response.usage.to_dict() |
|
logger.info( |
|
f"Tokens used - Prompt: {usage.get('prompt_tokens')}, Completion: {usage.get('completion_tokens')}, Total: {usage.get('total_tokens')} for {fname}" |
|
) |
|
|
|
# Safely update global token counters |
|
async with token_lock: |
|
total_input_tokens += usage.get("prompt_tokens", 0) |
|
total_output_tokens += usage.get("completion_tokens", 0) |
|
|
|
return assistant_message, finish_reason, False # No error occurred |
|
except APIError as e: |
|
logger.error(f"API Error while processing {fname}: {e}") |
|
return content, None, True # Error occurred |
|
except Exception as e: |
|
logger.error(f"Unexpected Error while processing {fname}: {e}") |
|
return content, None, True # Error occurred |
|
|
|
|
|
async def call_chatgpt_in_chunks(content, semaphore, fname): |
|
""" |
|
Processes the content in chunks, sends each chunk to ChatGPT, and aggregates the results. |
|
Returns the modified content and an error flag. |
|
""" |
|
max_output_tokens = 4000 # Output token limit |
|
max_input_tokens_per_chunk = 3500 # Fixed input token limit per chunk |
|
|
|
logger.info(f"Max input tokens per chunk: {max_input_tokens_per_chunk}") |
|
|
|
# Use regex to split content into paragraphs and include separators |
|
parts = re.split(r"(\n+)", content, flags=re.DOTALL) |
|
|
|
# Group parts into chunks without exceeding max_input_tokens_per_chunk |
|
chunks = [] |
|
current_chunk = "" |
|
current_tokens = 0 |
|
|
|
for part in parts: |
|
part_tokens = len(encoder.encode(part)) |
|
if current_tokens + part_tokens > max_input_tokens_per_chunk and current_chunk: |
|
chunks.append(current_chunk) |
|
current_chunk = part |
|
current_tokens = part_tokens |
|
else: |
|
current_chunk += part |
|
current_tokens += part_tokens |
|
|
|
if current_chunk: |
|
chunks.append(current_chunk) |
|
|
|
modified_chunks = [] |
|
errors_occurred = False # Initialize error flag |
|
|
|
for idx, chunk_text in enumerate(chunks): |
|
logger.info(f"Processing chunk {idx+1}/{len(chunks)} for {fname}") |
|
|
|
# Capture leading and trailing newlines |
|
leading_newlines = re.match(r"^(\n*)", chunk_text).group(1) |
|
trailing_newlines = re.search(r"(\n*)$", chunk_text).group(1) |
|
|
|
# Extract the content without leading/trailing newlines |
|
content_to_edit = chunk_text[ |
|
len(leading_newlines) : len(chunk_text) - len(trailing_newlines) |
|
] |
|
|
|
# If the content is empty (e.g., chunk contains only newlines), skip processing |
|
if not content_to_edit.strip(): |
|
modified_chunk_text = chunk_text # Preserve as is |
|
finish_reason = None |
|
else: |
|
# Process the content |
|
modified_content, finish_reason, error_occurred_chunk = await call_chatgpt( |
|
content_to_edit, semaphore, fname |
|
) |
|
if error_occurred_chunk: |
|
errors_occurred = True # An error occurred in this chunk |
|
# Re-add leading and trailing newlines |
|
modified_chunk_text = ( |
|
leading_newlines + modified_content + trailing_newlines |
|
) |
|
|
|
modified_chunks.append(modified_chunk_text) |
|
|
|
if finish_reason == "length": |
|
logger.warning(f"Output was truncated due to max token limit for {fname}.") |
|
|
|
# Optional random delay to reduce the chance of hitting rate limits |
|
await asyncio.sleep(random.uniform(0.1, 0.5)) |
|
|
|
modified_content = "".join(modified_chunks) |
|
return modified_content, errors_occurred # Return error flag |
|
|
|
|
|
async def process_file(fname, semaphore): |
|
""" |
|
Processes a single .qmd file: |
|
- Checks if the file needs to be processed based on 'llmupdated' and 'date-modified'. |
|
- Sends the content to ChatGPT for editing. |
|
- Checks for length differences. |
|
- Updates the file and annotates metadata if appropriate. |
|
Returns a tuple (fname, status) where status is 'updated', 'no_change', 'skipped', or 'error'. |
|
""" |
|
qmdname = fname.with_suffix(".qmd") |
|
logger.info(f"Processing file: {qmdname}") |
|
|
|
try: |
|
yaml_metadata, qmdcontent = read(qmdname) |
|
|
|
# Retrieve 'llmupdated' from YAML metadata, if it exists |
|
llmupdated_str = yaml_metadata.get("llmupdated") |
|
if llmupdated_str: |
|
try: |
|
llmupdated = datetime.fromisoformat(llmupdated_str) |
|
if llmupdated.tzinfo is None: |
|
# Assume local timezone if no timezone info |
|
llmupdated = llmupdated.replace( |
|
tzinfo=datetime.now().astimezone().tzinfo |
|
) |
|
except ValueError: |
|
logger.error( |
|
f"Invalid 'llmupdated' format in {fname}. Proceeding with processing." |
|
) |
|
llmupdated = None |
|
else: |
|
llmupdated = None |
|
|
|
# Retrieve 'date-modified' from YAML metadata, if it exists |
|
date_modified_str = yaml_metadata.get("date-modified") |
|
if date_modified_str: |
|
try: |
|
date_modified = datetime.fromisoformat(date_modified_str) |
|
if date_modified.tzinfo is None: |
|
# Assume local timezone if no timezone info |
|
date_modified = date_modified.replace( |
|
tzinfo=datetime.now().astimezone().tzinfo |
|
) |
|
except ValueError: |
|
logger.error( |
|
f"Invalid 'date-modified' format in {fname}. Proceeding with processing." |
|
) |
|
date_modified = None |
|
else: |
|
date_modified = None |
|
|
|
# If 'date-modified' is missing, we cannot proceed |
|
if not date_modified: |
|
logger.error( |
|
f"No 'date-modified' found in {fname}. Cannot determine if processing is needed." |
|
) |
|
return fname, "error" |
|
|
|
# If 'llmupdated' exists and 'date-modified' is not newer, skip processing |
|
if llmupdated and date_modified <= llmupdated: |
|
logger.info( |
|
f"File {fname} has not been modified since the last LLM update. Skipping." |
|
) |
|
return fname, "skipped" |
|
|
|
modified_content, errors_occurred = await call_chatgpt_in_chunks( |
|
qmdcontent, semaphore, fname |
|
) |
|
|
|
if errors_occurred: |
|
logger.error( |
|
f"Errors occurred while processing {fname}, not updating the file." |
|
) |
|
return fname, "error" |
|
|
|
# Check the length difference |
|
original_length = len(qmdcontent) |
|
modified_length = len(modified_content) |
|
length_difference = abs(modified_length - original_length) / original_length |
|
|
|
if length_difference > 0.1: |
|
logger.error( |
|
f"Modified content length differs by more than 10% for {fname}, not updating the file." |
|
) |
|
return fname, "error" |
|
|
|
# Always write out the updated content and metadata |
|
write(qmdname, yaml_metadata, modified_content) |
|
if modified_content != qmdcontent: |
|
logger.info(f"Content updated for {fname}") |
|
return fname, "updated" |
|
else: |
|
logger.info(f"No content changes for {fname}, but metadata updated") |
|
return fname, "no_change" |
|
|
|
except Exception as e: |
|
logger.error(f"Error processing {fname}: {e}") |
|
return fname, "error" |
|
|
|
|
|
async def main(glb0="**/*.qmd", *glbs): |
|
""" |
|
Main function to process all .qmd files based on provided glob patterns. |
|
Generates a report at the end listing updated, unchanged, skipped, and errored files. |
|
Also reports total input and output tokens used. |
|
""" |
|
semaphore = asyncio.Semaphore(CONCURRENCY_LIMIT) |
|
tasks = [] |
|
file_list = [] |
|
|
|
# Collect all file paths first |
|
for glb in [glb0, *glbs]: |
|
paths = Path("").glob(glb) |
|
for fname in paths: |
|
file_list.append(fname) |
|
|
|
## Shuffle the list of files to randomize processing order |
|
# random.shuffle(file_list) |
|
|
|
# Create tasks for processing files in the shuffled order |
|
for fname in file_list: |
|
tasks.append(asyncio.create_task(process_file(fname, semaphore))) |
|
|
|
# Execute the tasks concurrently and collect results |
|
results = await asyncio.gather(*tasks) |
|
|
|
# Initialize lists for reporting |
|
updated_files = [] |
|
no_change_files = [] |
|
skipped_files = [] |
|
error_files = [] |
|
|
|
# Categorize results |
|
for fname, status in results: |
|
if status == "updated": |
|
updated_files.append(str(fname)) |
|
elif status == "no_change": |
|
no_change_files.append(str(fname)) |
|
elif status == "skipped": |
|
skipped_files.append(str(fname)) |
|
elif status == "error": |
|
error_files.append(str(fname)) |
|
|
|
# Final report |
|
report = "\n=== Processing Report ===\n" |
|
|
|
if updated_files: |
|
report += "Files Updated:\n" |
|
for file in updated_files: |
|
report += f" - {file}\n" |
|
else: |
|
report += "No files were updated.\n" |
|
|
|
if no_change_files: |
|
report += "\nFiles Processed with No Content Changes (Metadata Updated):\n" |
|
for file in no_change_files: |
|
report += f" - {file}\n" |
|
else: |
|
report += "\nNo files were processed without content changes.\n" |
|
|
|
if skipped_files: |
|
report += "\nFiles Skipped (Up-to-date):\n" |
|
for file in skipped_files: |
|
report += f" - {file}\n" |
|
else: |
|
report += "\nNo files were skipped.\n" |
|
|
|
if error_files: |
|
report += "\nFiles with Errors or Too Large Changes:\n" |
|
for file in error_files: |
|
report += f" - {file}\n" |
|
else: |
|
report += "\nNo files encountered errors or excessive changes.\n" |
|
|
|
# Add total input and output tokens to the report |
|
report += f"\nTotal Input Tokens: {total_input_tokens}\n" |
|
report += f"Total Output Tokens: {total_output_tokens}\n" |
|
|
|
report += "\n=== End of Report ===\n" |
|
|
|
logger.info(report) |
|
|
|
|
|
if __name__ == "__main__": |
|
asyncio.run(main(*sys.argv[1:])) |