Created
April 24, 2025 11:38
-
-
Save DannyMac180/b5044c726cbe794f11e24f4cff5e2913 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
organize_downloads.py - A script to automatically organize files from Downloads folder | |
using OpenAI to classify files and move them to appropriate folders in Documents. | |
Usage: | |
python organize_downloads.py [options] | |
Options: | |
--api-key KEY OpenAI API key (can also be set via OPENAI_API_KEY env variable) | |
--downloads-dir DIR Path to Downloads directory (default: ~/Downloads) | |
--documents-dir DIR Path to Documents directory (default: ~/Documents) | |
--dry-run Preview moves without actually moving files | |
--log-file FILE Path to log file (default: organize_downloads.log) | |
--skip-extensions Comma-separated list of extensions to skip (e.g., ".tmp,.DS_Store") | |
--help Show this help message and exit | |
""" | |
import os | |
import sys | |
import shutil | |
import argparse | |
import logging | |
import json | |
import mimetypes | |
import time | |
from pathlib import Path | |
from datetime import datetime | |
from collections import defaultdict, Counter | |
import re | |
from openai import OpenAI | |
# Set up logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
datefmt='%Y-%m-%d %H:%M:%S' | |
) | |
logger = logging.getLogger(__name__) | |
# Constants | |
DEFAULT_DOWNLOADS_DIR = os.path.expanduser("~/Downloads") | |
DEFAULT_DOCUMENTS_DIR = os.path.expanduser("~/Documents") | |
DEFAULT_LOG_FILE = "organize_downloads.log" | |
EXCLUDED_FILES = [".DS_Store", ".localized", "desktop.ini", "Thumbs.db"] | |
EXCLUDED_EXTENSIONS = [".tmp", ".crdownload", ".part", ".partial"] | |
MAX_RETRY_ATTEMPTS = 3 | |
RETRY_DELAY = 2 # seconds | |
class FileOrganizer: | |
"""Class to organize files from Downloads to Documents using LLM classification.""" | |
def __init__(self, api_key=None, downloads_dir=DEFAULT_DOWNLOADS_DIR, | |
documents_dir=DEFAULT_DOCUMENTS_DIR, dry_run=False, | |
log_file=DEFAULT_LOG_FILE, excluded_extensions=None): | |
"""Initialize the FileOrganizer with the given parameters.""" | |
self.downloads_dir = Path(downloads_dir).expanduser().resolve() | |
self.documents_dir = Path(documents_dir).expanduser().resolve() | |
self.dry_run = dry_run | |
self.excluded_extensions = set(EXCLUDED_EXTENSIONS) | |
if excluded_extensions: | |
self.excluded_extensions.update(excluded_extensions) | |
# Set up file logging | |
self.file_handler = logging.FileHandler(log_file) | |
self.file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) | |
logger.addHandler(self.file_handler) | |
# Initialize OpenAI client | |
OPENAI_API_KEY='YOUR API KEY HERE' | |
self.client = OpenAI(api_key=OPENAI_API_KEY) | |
# Statistics | |
self.stats = { | |
'total_files': 0, | |
'moved_files': 0, | |
'skipped_files': 0, | |
'error_files': 0, | |
'destinations': Counter() | |
} | |
# Movement log | |
self.movement_log = [] | |
# Get available folders in Documents | |
self.available_folders = self._get_available_folders() | |
logger.info(f"Found {len(self.available_folders)} folders in Documents: {', '.join(self.available_folders)}") | |
def _get_available_folders(self): | |
"""Get a list of available folders in the Documents directory.""" | |
folders = [] | |
try: | |
# Get all directories in the Documents folder (depth 1) | |
for item in self.documents_dir.iterdir(): | |
if item.is_dir(): | |
folders.append(item.name) | |
except Exception as e: | |
logger.error(f"Error getting available folders: {str(e)}") | |
return folders | |
def _get_file_info(self, file_path): | |
"""Get information about a file.""" | |
try: | |
path = Path(file_path) | |
stats = path.stat() | |
# Get file size | |
size_bytes = stats.st_size | |
if size_bytes < 1024: | |
size_str = f"{size_bytes} bytes" | |
elif size_bytes < 1024 * 1024: | |
size_str = f"{size_bytes / 1024:.1f} KB" | |
else: | |
size_str = f"{size_bytes / (1024 * 1024):.1f} MB" | |
# Get mime type | |
mime_type, _ = mimetypes.guess_type(file_path) | |
# Get creation and modification time | |
creation_time = datetime.fromtimestamp(stats.st_ctime).strftime('%Y-%m-%d %H:%M:%S') | |
modified_time = datetime.fromtimestamp(stats.st_mtime).strftime('%Y-%m-%d %H:%M:%S') | |
return { | |
"name": path.name, | |
"extension": path.suffix.lower(), | |
"size": size_str, | |
"raw_size": size_bytes, | |
"mime_type": mime_type or "Unknown", | |
"created": creation_time, | |
"modified": modified_time, | |
} | |
except Exception as e: | |
logger.error(f"Error getting file info for {file_path}: {str(e)}") | |
return { | |
"name": Path(file_path).name, | |
"extension": Path(file_path).suffix.lower(), | |
"error": str(e) | |
} | |
def _classify_file(self, file_info): | |
"""Use OpenAI to classify the file and suggest a destination folder.""" | |
# Create a prompt for the LLM | |
prompt = f""" | |
I need to classify this file into one of my Documents folders. Based on the file information: | |
- File name: {file_info['name']} | |
- Extension: {file_info['extension']} | |
- MIME type: {file_info['mime_type']} | |
- Size: {file_info['size']} | |
- Created: {file_info['created']} | |
- Modified: {file_info['modified']} | |
The available folders in my Documents directory are: | |
{', '.join(self.available_folders)} | |
Please analyze the file details and determine which folder would be the most appropriate | |
destination for this file. Respond with the exact name of one of the available folders | |
listed above (case-sensitive). If none of the folders are appropriate, respond with "misc". | |
Only respond with a folder name, nothing else. | |
""" | |
for attempt in range(MAX_RETRY_ATTEMPTS): | |
try: | |
response = self.client.chat.completions.create( | |
model="gpt-4o-mini", | |
messages=[ | |
{"role": "system", "content": "You are a helpful assistant that classifies files into folders based on their metadata."}, | |
{"role": "user", "content": prompt} | |
], | |
max_tokens=50, | |
temperature=0.2 | |
) | |
# Extract the suggested folder from the response | |
suggested_folder = response.choices[0].message.content.strip() | |
# Validate the response | |
if suggested_folder in self.available_folders: | |
return suggested_folder | |
elif suggested_folder.lower() == "misc": | |
return "misc" | |
else: | |
# Try to match partial folder names | |
for folder in self.available_folders: | |
if folder.lower() in suggested_folder.lower(): | |
return folder | |
logger.warning(f"Invalid folder suggestion '{suggested_folder}' for {file_info['name']}. Using 'misc'.") | |
return "misc" | |
except Exception as e: | |
logger.error(f"Error classifying file (attempt {attempt+1}/{MAX_RETRY_ATTEMPTS}): {str(e)}") | |
if attempt < MAX_RETRY_ATTEMPTS - 1: | |
time.sleep(RETRY_DELAY) | |
# Default to misc if all attempts failed | |
return "misc" | |
def _ensure_directory_exists(self, directory): | |
"""Ensure that the specified directory exists.""" | |
if not self.dry_run: | |
try: | |
os.makedirs(directory, exist_ok=True) | |
except Exception as e: | |
logger.error(f"Error creating directory {directory}: {str(e)}") | |
return False | |
return True | |
def _handle_file_conflict(self, source_path, target_path): | |
"""Handle file name conflicts in the target directory.""" | |
if not target_path.exists(): | |
return target_path | |
# File exists, generate a new name with a counter | |
counter = 1 | |
name_parts = target_path.stem, target_path.suffix | |
while True: | |
new_path = target_path.with_name(f"{name_parts[0]} ({counter}){name_parts[1]}") | |
if not new_path.exists(): | |
return new_path | |
counter += 1 | |
def _move_file(self, source_path, destination_folder): | |
"""Move a file to the destination folder.""" | |
try: | |
# Create the full target path | |
target_dir = self.documents_dir / destination_folder | |
if not self._ensure_directory_exists(target_dir): | |
return False, f"Failed to create directory {target_dir}" | |
target_path = target_dir / source_path.name | |
# Handle file conflicts | |
target_path = self._handle_file_conflict(source_path, target_path) | |
# Move the file | |
if self.dry_run: | |
logger.info(f"DRY RUN: Would move {source_path} to {target_path}") | |
result = True | |
message = f"Would move to {target_path}" | |
else: | |
shutil.move(str(source_path), str(target_path)) | |
logger.info(f"Moved {source_path} to {target_path}") | |
result = True | |
message = f"Moved to {target_path}" | |
return result, message | |
except Exception as e: | |
logger.error(f"Error moving file {source_path}: {str(e)}") | |
return False, str(e) | |
def organize(self): | |
"""Organize files from the Downloads directory.""" | |
start_time = time.time() | |
logger.info(f"Starting organization of {self.downloads_dir}") | |
# Create the misc folder if it doesn't exist in our available folders | |
if "misc" not in self.available_folders: | |
misc_dir = self.documents_dir / "misc" | |
if self._ensure_directory_exists(misc_dir): | |
self.available_folders.append("misc") | |
logger.info("Created 'misc' folder for unclassified files") | |
# Process each file in the Downloads directory | |
for item in self.downloads_dir.iterdir(): | |
if not item.is_file(): | |
continue | |
# Skip excluded files | |
if item.name in EXCLUDED_FILES or item.suffix.lower() in self.excluded_extensions: | |
logger.debug(f"Skipping excluded file: {item.name}") | |
self.stats['skipped_files'] += 1 | |
continue | |
self.stats['total_files'] += 1 | |
logger.info(f"Processing file: {item.name}") | |
# Get file information | |
file_info = self._get_file_info(item) | |
# Classify the file | |
destination = self._classify_file(file_info) | |
logger.info(f"Classified {item.name} to destination: {destination}") | |
# Move the file | |
success, message = self._move_file(item, destination) | |
# Update statistics and log | |
if success: | |
self.stats['moved_files'] += 1 | |
self.stats['destinations'][destination] += 1 | |
else: | |
self.stats['error_files'] += 1 | |
# Record the movement | |
self.movement_log.append({ | |
"file": item.name, | |
"source": str(item), | |
"destination": destination, | |
"success": success, | |
"message": message, | |
"timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
}) | |
# Log summary | |
elapsed_time = time.time() - start_time | |
logger.info(f"Organization completed in {elapsed_time:.2f} seconds") | |
self._log_summary() | |
return self.movement_log, self.stats | |
def _log_summary(self): | |
"""Log a summary of the organization process.""" | |
summary = "\n" + "="*50 + "\n" | |
summary += "ORGANIZATION SUMMARY\n" | |
summary += "="*50 + "\n" | |
summary += f"Total files processed: {self.stats['total_files']}\n" | |
summary += f"Files moved: {self.stats['moved_files']}\n" | |
summary += f"Files skipped: {self.stats['skipped_files']}\n" | |
summary += f"Files with errors: {self.stats['error_files']}\n\n" | |
if self.stats['destinations']: | |
summary += "Destination breakdown:\n" | |
for dest, count in self.stats['destinations'].most_common(): | |
summary += f" - {dest}: {count} files\n" | |
logger.info(summary) | |
# Save movement log to JSON | |
log_filename = f"organize_downloads_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" | |
try: | |
with open(log_filename, 'w') as f: | |
json.dump({ | |
"stats": {k: v if not isinstance(v, Counter) else dict(v) | |
for k, v in self.stats.items()}, | |
"movements": self.movement_log | |
}, f, indent=2) | |
logger.info(f"Detailed log saved to {log_filename}") | |
except Exception as e: | |
logger.error(f"Error saving detailed log: {str(e)}") | |
def parse_arguments(): | |
"""Parse command line arguments.""" | |
parser = argparse.ArgumentParser( | |
description="Organize files from Downloads folder using OpenAI classification" | |
) | |
parser.add_argument( | |
"--api-key", | |
help="OpenAI API key (can also be set via OPENAI_API_KEY env variable)" | |
) | |
parser.add_argument( | |
"--downloads-dir", | |
default=DEFAULT_DOWNLOADS_DIR, | |
help=f"Path to Downloads directory (default: {DEFAULT_DOWNLOADS_DIR})" | |
) | |
parser.add_argument( | |
"--documents-dir", | |
default=DEFAULT_DOCUMENTS_DIR, | |
help=f"Path to Documents directory (default: {DEFAULT_DOCUMENTS_DIR})" | |
) | |
parser.add_argument( | |
"--dry-run", | |
action="store_true", | |
help="Preview moves without actually moving files" | |
) | |
parser.add_argument( | |
"--log-file", | |
default=DEFAULT_LOG_FILE, | |
help=f"Path to log file (default: {DEFAULT_LOG_FILE})" | |
) | |
parser.add_argument( | |
"--skip-extensions", | |
help="Comma-separated list of extensions to skip (e.g., '.tmp,.DS_Store')" | |
) | |
return parser.parse_args() | |
def main(): | |
"""Main function to run the file organizer.""" | |
args = parse_arguments() | |
# Process excluded extensions | |
excluded_extensions = None | |
if args.skip_extensions: | |
excluded_extensions = [ext.strip() for ext in args.skip_extensions.split(',')] | |
# Create and run the file organizer | |
try: | |
organizer = FileOrganizer( | |
api_key=args.api_key, | |
downloads_dir=args.downloads_dir, | |
documents_dir=args.documents_dir, | |
dry_run=args.dry_run, | |
log_file=args.log_file, | |
excluded_extensions=excluded_extensions | |
) | |
# Verify access to directories | |
if not os.access(organizer.downloads_dir, os.R_OK): | |
logger.error(f"Cannot read from Downloads directory: {organizer.downloads_dir}") | |
return 1 | |
if not args.dry_run and not os.access(organizer.documents_dir, os.W_OK): | |
logger.error(f"Cannot write to Documents directory: {organizer.documents_dir}") | |
return 1 | |
# Run the organization | |
_, stats = organizer.organize() | |
# Print a simple summary to the console | |
print(f"\nOrganization complete!") | |
print(f"Files processed: {stats['total_files']}") | |
print(f"Files moved: {stats['moved_files']}") | |
print(f"Files skipped: {stats['skipped_files']}") | |
print(f"Files with errors: {stats['error_files']}") | |
return 0 | |
except KeyboardInterrupt: | |
print("\nOperation cancelled by user.") | |
return 130 | |
except Exception as e: | |
logger.error(f"An unexpected error occurred: {str(e)}") | |
return 1 | |
if __name__ == "__main__": | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment