Created
July 6, 2025 19:24
-
-
Save mlagerberg/10d85936c6c8c1184d66d15b8d8f683e to your computer and use it in GitHub Desktop.
Going from one mail provider to the next can be cumbersome. Especially when the old provider only offers exporting a separate .eml files. This script converts all files from having a weird base64 encoding to something more normal, which works with Thunderbird, Soverin and other mail clients and providers.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import argparse | |
| import os | |
| import os.path | |
| import email | |
| import sys | |
| from email import policy | |
| from email.header import decode_header, make_header | |
| from email.message import EmailMessage | |
| from email.mime.multipart import MIMEMultipart | |
| from email.mime.text import MIMEText | |
| from email.mime.base import MIMEBase | |
| default_input_folder = "./TutaExport" | |
| default_output_folder = "./converted" | |
| def decode_header_value(header_value): | |
| """Properly decode email headers""" | |
| if header_value is None: | |
| return '' | |
| return str(make_header(decode_header(header_value))) | |
| def rebuild_eml(original_msg, depth=0, max_depth=50): | |
| """Rebuild email message with proper MIME structure""" | |
| if depth > max_depth: | |
| print("Maximum recursion depth reached. Skipping part.") | |
| return EmailMessage() | |
| if original_msg.is_multipart(): | |
| # Create appropriate multipart container | |
| multipart_type = original_msg.get_content_subtype() | |
| new_msg = MIMEMultipart(multipart_type) | |
| # Copy headers - different logic for root vs nested levels | |
| for header, value in original_msg.items(): | |
| header_lower = header.lower() | |
| if depth == 0: | |
| # At root level, copy ALL headers except Content-Type and boundary-related ones | |
| # (MIMEMultipart will set these automatically with new boundaries) | |
| if header_lower not in ['content-type']: | |
| new_msg[header] = decode_header_value(value) | |
| else: | |
| # At deeper levels, only copy non-MIME headers | |
| if not header_lower.startswith('content-') and not header_lower.startswith('mime-'): | |
| new_msg[header] = decode_header_value(value) | |
| # Process each part | |
| for part in original_msg.iter_parts(): | |
| rebuilt_part = rebuild_eml(part, depth=depth+1, max_depth=max_depth) | |
| new_msg.attach(rebuilt_part) | |
| return new_msg | |
| else: | |
| # Handle single parts | |
| content_type = original_msg.get_content_type() | |
| charset = original_msg.get_content_charset() or 'utf-8' | |
| # Get decoded payload | |
| payload = original_msg.get_payload(decode=True) | |
| if payload: | |
| try: | |
| decoded_payload = payload.decode(charset, errors='replace') | |
| except (UnicodeDecodeError, LookupError): | |
| decoded_payload = payload.decode('utf-8', errors='replace') | |
| else: | |
| decoded_payload = '' | |
| # Check if this is a nested MIME structure | |
| if ('Content-Type:' in decoded_payload and 'boundary=' in decoded_payload): | |
| # Parse nested MIME structure | |
| try: | |
| embedded_msg = email.message_from_string(decoded_payload, policy=policy.default) | |
| new_msg = rebuild_eml(embedded_msg, depth=depth+1, max_depth=max_depth) | |
| except Exception as e: | |
| print(f"Failed to parse nested MIME: {e}") | |
| # Fall back to treating as plain text | |
| else: | |
| # Create appropriate MIME part | |
| if content_type.startswith('text/'): | |
| if content_type == 'text/html': | |
| new_msg = MIMEText(decoded_payload, 'html', 'utf-8') | |
| else: | |
| new_msg = MIMEText(decoded_payload, 'plain', 'utf-8') | |
| else: | |
| # Handle other content types (attachments, etc.) | |
| new_msg = MIMEBase(*content_type.split('/')) | |
| new_msg.set_payload(payload) | |
| # Copy headers - different logic for root vs nested levels | |
| for header, value in original_msg.items(): | |
| header_lower = header.lower() | |
| if depth == 0: | |
| # At root level, copy ALL headers except Content-Type | |
| # (MIMEMultipart/MIMEText will set these automatically) | |
| if header_lower not in ['content-type']: | |
| new_msg[header] = decode_header_value(value) | |
| else: | |
| # At deeper levels, only copy non-MIME headers | |
| if not header_lower.startswith('content-') and not header_lower.startswith('mime-'): | |
| new_msg[header] = decode_header_value(value) | |
| return new_msg | |
| def process_email_file(input_file, output_file, overwrite=False): | |
| """Process a single email file""" | |
| try: | |
| with open(input_file, 'r', encoding='utf-8') as f: | |
| original_msg = email.message_from_file(f, policy=policy.default) | |
| # Rebuild the message | |
| clean_msg = rebuild_eml(original_msg) | |
| # Write the cleaned message | |
| with open(output_file, 'w', encoding='utf-8') as f: | |
| f.write(clean_msg.as_string(policy=policy.default)) | |
| return True | |
| except Exception as e: | |
| print(f'Failed to process {os.path.basename(input_file)}: {e}') | |
| return False | |
| if __name__ == '__main__': | |
| # Process all .eml files | |
| successful = 0 | |
| failed = 0 | |
| parser = argparse.ArgumentParser(prog='convert.py', description="Converts .eml files that are base64 encoded to something Thunderbird and most mail servers understand") | |
| parser.add_argument('-i', '--input_dir', help="Input directory") | |
| parser.add_argument('-o', '--output_dir', help="Output directory") | |
| parser.add_argument('-f', '--force', help="Force overwriting existing files in output dir", action="store_true") | |
| args = parser.parse_args() | |
| input_folder = args.input_dir if args.input_dir else default_input_folder | |
| output_folder = args.output_dir if args.output_dir else default_output_folder | |
| os.makedirs(output_folder, exist_ok=True) | |
| for filename in os.listdir(input_folder): | |
| if filename.endswith(".eml"): | |
| input_file = os.path.join(input_folder, filename) | |
| output_file = os.path.join(output_folder, filename) | |
| if args.force or not os.path.exists(output_file): | |
| print(f'Processing {filename}...', end=' ') | |
| if process_email_file(input_file, output_file): | |
| print('✓') | |
| successful += 1 | |
| else: | |
| print('✗') | |
| failed += 1 | |
| print(f"\nDone! Successfully processed {successful} emails, {failed} failed.") | |
| # Exit with a proper exit code | |
| sys.exit(0 if failed == 0 else 1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment