Created
August 18, 2025 01:03
-
-
Save itsnotyoutoday/f9a8e6b30c136e515e1499c6f4467552 to your computer and use it in GitHub Desktop.
MailJet / Godaddy DomainManager
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Domain Management Script for Mailjet and GoDaddy Integration | |
Adds domains to Mailjet and configures DNS records via GoDaddy API | |
""" | |
import sys | |
import json | |
import requests | |
import time | |
import argparse | |
import os | |
from typing import Dict, List, Optional | |
from dotenv import load_dotenv | |
# Load environment variables from .env file | |
load_dotenv() | |
try: | |
from mailjet_rest import Client | |
MAILJET_AVAILABLE = True | |
except ImportError: | |
MAILJET_AVAILABLE = False | |
print("⚠️ mailjet-rest not installed, falling back to requests") | |
# Configuration from environment variables | |
MAILJET_API_KEY = os.getenv("MAILJET_API_KEY") | |
MAILJET_API_SECRET = os.getenv("MAILJET_API_SECRET") | |
# GoDaddy API Configuration | |
GODADDY_API_KEY = os.getenv("GODADDY_API_KEY") | |
GODADDY_API_SECRET = os.getenv("GODADDY_API_SECRET") | |
GODADDY_BASE_URL = "https://api.godaddy.com/v1" | |
# Validate required environment variables | |
if not all([MAILJET_API_KEY, MAILJET_API_SECRET, GODADDY_API_KEY, GODADDY_API_SECRET]): | |
print("✗ Missing required environment variables. Please check your .env file.") | |
print("Required variables: MAILJET_API_KEY, MAILJET_API_SECRET, GODADDY_API_KEY, GODADDY_API_SECRET") | |
sys.exit(1) | |
class DomainManager: | |
def __init__(self): | |
if MAILJET_AVAILABLE: | |
self.mailjet = Client(auth=(MAILJET_API_KEY, MAILJET_API_SECRET), version='v3') | |
else: | |
self.mailjet_auth = (MAILJET_API_KEY, MAILJET_API_SECRET) | |
self.godaddy_headers = { | |
'Authorization': f'sso-key {GODADDY_API_KEY}:{GODADDY_API_SECRET}', | |
'Content-Type': 'application/json' | |
} | |
def find_existing_domain_in_mailjet(self, domain: str) -> Optional[Dict]: | |
"""Check if domain already exists in Mailjet""" | |
try: | |
if MAILJET_AVAILABLE: | |
# Search for senders with this domain | |
result = self.mailjet.sender.get(filters={'Email': f'*@{domain}'}) | |
if result.status_code == 200: | |
senders = result.json()['Data'] | |
if senders: | |
print(f"✓ Domain {domain} already exists in Mailjet with ID: {senders[0]['ID']}") | |
return senders[0] | |
return None | |
else: | |
return None | |
else: | |
# Fallback to requests | |
response = requests.get( | |
"https://api.mailjet.com/v3/REST/sender", | |
auth=self.mailjet_auth, | |
params={'Email': f'*@{domain}'} | |
) | |
if response.status_code == 200: | |
senders = response.json()['Data'] | |
if senders: | |
print(f"✓ Domain {domain} already exists in Mailjet with ID: {senders[0]['ID']}") | |
return senders[0] | |
return None | |
else: | |
return None | |
except Exception as e: | |
print(f"⚠️ Error checking existing domain: {e}") | |
return None | |
def add_domain_to_mailjet(self, domain: str) -> Optional[Dict]: | |
"""Add domain to Mailjet for verification (or return existing)""" | |
print(f"Checking if domain {domain} exists in Mailjet...") | |
# First check if domain already exists | |
existing_domain = self.find_existing_domain_in_mailjet(domain) | |
if existing_domain: | |
return existing_domain | |
print(f"Domain not found, adding {domain} to Mailjet...") | |
# Add domain as catch-all sender | |
sender_data = { | |
"Email": f"*@{domain}", | |
"Name": f"Domain {domain}" | |
} | |
try: | |
if MAILJET_AVAILABLE: | |
# Use official Mailjet library | |
result = self.mailjet.sender.create(data=sender_data) | |
if result.status_code == 201: | |
print(f"✓ Domain added to Mailjet with ID: {result.json()['Data'][0]['ID']}") | |
return result.json()['Data'][0] | |
else: | |
print(f"✗ Failed to add domain: {result.status_code} - {result.json()}") | |
return None | |
else: | |
# Fallback to requests | |
response = requests.post( | |
"https://api.mailjet.com/v3/REST/sender", | |
auth=self.mailjet_auth, | |
headers={'Content-Type': 'application/json'}, | |
json=sender_data | |
) | |
if response.status_code == 201: | |
result = response.json() | |
print(f"✓ Domain added to Mailjet with ID: {result['Data'][0]['ID']}") | |
return result['Data'][0] | |
else: | |
print(f"✗ Failed to add domain: {response.status_code} - {response.text}") | |
return None | |
except Exception as e: | |
print(f"✗ Error adding domain to Mailjet: {e}") | |
return None | |
def get_domain_verification_info(self, sender_id: int) -> Optional[Dict]: | |
"""Get DNS verification records needed for domain""" | |
print(f"Getting verification info for sender ID {sender_id}...") | |
try: | |
if MAILJET_AVAILABLE: | |
# Use official Mailjet library | |
sender_result = self.mailjet.sender.get(id=sender_id) | |
if sender_result.status_code == 200: | |
sender_data = sender_result.json()['Data'][0] | |
print(f"✓ Retrieved sender info") | |
print(f"Debug: Sender data: {json.dumps(sender_data, indent=2)}") | |
# Check if DomainID exists, otherwise try to extract domain from email | |
if 'DomainID' in sender_data: | |
domain_id = sender_data['DomainID'] | |
else: | |
# Use DNSID from sender data if available | |
if 'DNSID' in sender_data: | |
domain_id = sender_data['DNSID'] | |
print(f"Using DNSID from sender data: {domain_id}") | |
else: | |
# Extract domain from sender email and try to find domain ID | |
sender_email = sender_data.get('Email', '') | |
if '@' in sender_email: | |
domain_name = sender_email.split('@')[1] | |
print(f"Extracting domain '{domain_name}' from sender email") | |
# Try to get domain info by name | |
domain_result = self.mailjet.dns.get(filters={'Domain': domain_name}) | |
if domain_result.status_code == 200 and domain_result.json()['Data']: | |
domain_id = domain_result.json()['Data'][0]['ID'] | |
else: | |
print(f"✗ Could not find domain ID for {domain_name}") | |
return None | |
else: | |
print(f"✗ Could not extract domain from sender email: {sender_email}") | |
return None | |
# Get DNS records needed | |
dns_result = self.mailjet.dns.get(id=domain_id) | |
if dns_result.status_code == 200: | |
dns_data = dns_result.json()['Data'][0] | |
print(f"✓ Retrieved DNS verification info") | |
return { | |
'sender': sender_data, | |
'dns': dns_data | |
} | |
else: | |
print(f"✗ Failed to get DNS info: {dns_result.status_code}") | |
return None | |
else: | |
print(f"✗ Failed to get sender info: {sender_result.status_code}") | |
return None | |
else: | |
# Fallback to requests | |
response = requests.get( | |
f"https://api.mailjet.com/v3/REST/sender/{sender_id}", | |
auth=self.mailjet_auth | |
) | |
if response.status_code == 200: | |
sender_data = response.json()['Data'][0] | |
print(f"✓ Retrieved sender info") | |
print(f"Debug: Sender data: {json.dumps(sender_data, indent=2)}") | |
# Check if DomainID exists, otherwise try to extract domain from email | |
if 'DomainID' in sender_data: | |
domain_id = sender_data['DomainID'] | |
else: | |
# Use DNSID from sender data if available | |
if 'DNSID' in sender_data: | |
domain_id = sender_data['DNSID'] | |
print(f"Using DNSID from sender data: {domain_id}") | |
else: | |
# Extract domain from sender email and try to find domain ID | |
sender_email = sender_data.get('Email', '') | |
if '@' in sender_email: | |
domain_name = sender_email.split('@')[1] | |
print(f"Extracting domain '{domain_name}' from sender email") | |
# Try to get domain info by name | |
domain_response = requests.get( | |
"https://api.mailjet.com/v3/REST/dns", | |
auth=self.mailjet_auth, | |
params={'Domain': domain_name} | |
) | |
if domain_response.status_code == 200 and domain_response.json()['Data']: | |
domain_id = domain_response.json()['Data'][0]['ID'] | |
else: | |
print(f"✗ Could not find domain ID for {domain_name}") | |
return None | |
else: | |
print(f"✗ Could not extract domain from sender email: {sender_email}") | |
return None | |
# Get DNS records needed | |
dns_response = requests.get( | |
f"https://api.mailjet.com/v3/REST/dns/{domain_id}", | |
auth=self.mailjet_auth | |
) | |
if dns_response.status_code == 200: | |
dns_data = dns_response.json()['Data'][0] | |
print(f"✓ Retrieved DNS verification info") | |
return { | |
'sender': sender_data, | |
'dns': dns_data | |
} | |
else: | |
print(f"✗ Failed to get DNS info: {dns_response.status_code}") | |
return None | |
else: | |
print(f"✗ Failed to get sender info: {response.status_code}") | |
return None | |
except Exception as e: | |
print(f"✗ Error getting verification info: {e}") | |
return None | |
def add_godaddy_dns_record(self, domain: str, record_type: str, name: str, data: str, ttl: int = 3600, replace_existing: bool = True, priority: int = None) -> bool: | |
"""Add or update DNS record to GoDaddy""" | |
print(f"Adding {record_type} record: {name}.{domain} -> {data}") | |
if not GODADDY_API_KEY or not GODADDY_API_SECRET: | |
print("✗ GoDaddy API credentials not configured") | |
return False | |
record_data = [{ | |
"type": record_type, | |
"name": name, | |
"data": data, | |
"ttl": ttl | |
}] | |
# Handle MX records with priority | |
if record_type == "MX" and priority is not None: | |
record_data[0]["priority"] = priority | |
elif record_type == "MX" and " " in data: | |
# Extract priority from data if formatted as "10 hostname" | |
parts = data.split(" ", 1) | |
if len(parts) == 2 and parts[0].isdigit(): | |
record_data[0]["priority"] = int(parts[0]) | |
record_data[0]["data"] = parts[1] | |
try: | |
if replace_existing: | |
# Use PUT to replace existing records of this type/name | |
response = requests.put( | |
f"{GODADDY_BASE_URL}/domains/{domain}/records/{record_type}/{name}", | |
headers=self.godaddy_headers, | |
json=record_data | |
) | |
else: | |
# Use PATCH to add alongside existing records | |
response = requests.patch( | |
f"{GODADDY_BASE_URL}/domains/{domain}/records", | |
headers=self.godaddy_headers, | |
json=record_data | |
) | |
if response.status_code == 200: | |
print(f"✓ DNS record {'updated' if replace_existing else 'added'} successfully") | |
return True | |
else: | |
print(f"✗ Failed to {'update' if replace_existing else 'add'} DNS record: {response.status_code} - {response.text}") | |
return False | |
except Exception as e: | |
print(f"✗ Error {'updating' if replace_existing else 'adding'} DNS record: {e}") | |
return False | |
def setup_mailjet_dns_records(self, domain: str, dns_info: Dict) -> bool: | |
"""Setup required DNS records for Mailjet verification and email authentication""" | |
print(f"Setting up DNS records for {domain}...") | |
print(f"Debug: DNS info received: {json.dumps(dns_info, indent=2)}") | |
success = True | |
# Get server hostname for records | |
server_hostname = "mail.latinaloveconnections.com" | |
# 1. SPF Record - Based on Mailjet documentation structure | |
spf_record = f"v=spf1 include:spf.mailjet.com a:{server_hostname} ~all" | |
# Look for SPF record in various possible field names from Mailjet API | |
spf_fields = ['SPFRecordValue', 'SPF', 'spf_record', 'SPFValue'] | |
for field in spf_fields: | |
if field in dns_info and dns_info[field]: | |
mailjet_spf = dns_info[field] | |
print(f"Mailjet provided SPF ({field}): {mailjet_spf}") | |
# Parse Mailjet's SPF and merge with our server | |
if mailjet_spf.startswith('v=spf1'): | |
# Remove the ~all or -all from Mailjet's SPF | |
mailjet_mechanisms = mailjet_spf.replace('v=spf1 ', '').replace(' ~all', '').replace(' -all', '') | |
# Create merged SPF with our server | |
spf_record = f"v=spf1 {mailjet_mechanisms} a:{server_hostname} ~all" | |
break | |
print(f"Setting merged SPF record: {spf_record}") | |
if not self.add_godaddy_dns_record(domain, "TXT", "@", spf_record): | |
success = False | |
# 2. DKIM Records from Mailjet - Based on documentation | |
dkim_records_added = 0 | |
# Standard DKIM fields | |
dkim_fields = [ | |
('DKIMRecordName', 'DKIMRecordValue'), | |
('DKIM', 'DKIMValue'), | |
('dkim_record_name', 'dkim_record_value'), | |
('DKIMSelector', 'DKIMPublicKey') | |
] | |
for name_field, value_field in dkim_fields: | |
if name_field in dns_info and value_field in dns_info: | |
dkim_name = dns_info[name_field] | |
dkim_value = dns_info[value_field] | |
# Fix DKIM record name - replace any domain with correct one | |
if '.' in dkim_name: | |
# Extract selector part (before first dot) | |
selector_part = dkim_name.split('.')[0] | |
dkim_name = f"{selector_part}._domainkey" | |
else: | |
dkim_name = f"{dkim_name}._domainkey" | |
print(f"Setting Mailjet DKIM record: {dkim_name} -> {dkim_value}") | |
if self.add_godaddy_dns_record(domain, "TXT", dkim_name, dkim_value): | |
dkim_records_added += 1 | |
else: | |
success = False | |
break | |
# Check for additional DKIM fields that might be present | |
for key in dns_info: | |
if 'dkim' in key.lower() and key not in [f[0] for f in dkim_fields] + [f[1] for f in dkim_fields]: | |
print(f"Found additional DKIM field: {key} = {dns_info[key]}") | |
# 3. DKIM Record for our Zimbra server (use different selector to avoid conflicts) | |
zimbra_dkim_selector = "zimbra" | |
zimbra_dkim_record = self.get_zimbra_dkim_record(domain) | |
if zimbra_dkim_record: | |
print(f"Setting Zimbra DKIM record: {zimbra_dkim_selector}._domainkey") | |
if self.add_godaddy_dns_record(domain, "TXT", f"{zimbra_dkim_selector}._domainkey", zimbra_dkim_record): | |
dkim_records_added += 1 | |
else: | |
success = False | |
else: | |
print("⚠️ Could not retrieve Zimbra DKIM record - may need manual setup") | |
print(f"✓ Added {dkim_records_added} DKIM records") | |
# 4. DMARC Policy Record - Check existing first | |
existing_dmarc = self.get_existing_dmarc(domain) | |
if existing_dmarc: | |
print(f"⚠️ Existing DMARC policy found: {existing_dmarc}") | |
print("🔄 Updating DMARC to support both Mailjet and server...") | |
# More permissive policy for hybrid setup | |
dmarc_record = f"v=DMARC1; p=quarantine; rua=mailto:dmarc@{domain}; ruf=mailto:dmarc@{domain}; fo=1; adkim=r; aspf=r" | |
else: | |
dmarc_record = f"v=DMARC1; p=quarantine; rua=mailto:dmarc@{domain}; ruf=mailto:dmarc@{domain}; fo=1; adkim=r; aspf=r" | |
print(f"Setting DMARC record: {dmarc_record}") | |
if not self.add_godaddy_dns_record(domain, "TXT", "_dmarc", dmarc_record, replace_existing=True): | |
success = False | |
# 5. MX Record pointing to our server | |
print(f"Setting MX record pointing to {server_hostname}") | |
if not self.add_godaddy_dns_record(domain, "MX", "@", server_hostname, priority=10): | |
success = False | |
# 6. Domain verification records (check for various field names based on docs) | |
verification_fields = [ | |
('OwnerShipTokenRecordName', 'OwnerShipToken'), | |
('VerificationRecordName', 'VerificationRecordValue'), | |
('DomainValidationRecordName', 'DomainValidationRecordValue'), | |
('Filename', 'Token'), # Based on API docs | |
('verification_record', 'verification_value') | |
] | |
for name_field, value_field in verification_fields: | |
if name_field in dns_info and value_field in dns_info: | |
verify_name = dns_info[name_field] | |
verify_value = dns_info[value_field] | |
# Fix verification record name - extract the relevant parts | |
if verify_name.endswith(f'.{domain}.'): | |
# Remove only the domain suffix, keep the full selector | |
verify_name = verify_name.replace(f'.{domain}.', '') | |
elif verify_name.endswith('.'): | |
verify_name = verify_name[:-1] # Remove trailing dot | |
print(f"Setting verification record: {verify_name} -> {verify_value}") | |
if not self.add_godaddy_dns_record(domain, "TXT", verify_name, verify_value): | |
success = False | |
break | |
# 7. Print all DNS fields for debugging | |
print("\n=== All DNS fields received from Mailjet ===") | |
for key, value in dns_info.items(): | |
print(f"{key}: {value}") | |
print("===========================================\n") | |
return success | |
def get_zimbra_dkim_record(self, domain: str) -> Optional[str]: | |
"""Get DKIM public key record for Zimbra domain""" | |
try: | |
import subprocess | |
# Try to get DKIM public key from Zimbra | |
result = subprocess.run([ | |
'su', '-', 'zimbra', '-c', | |
f'zmprov gd {domain} zimbraDKIMPublicKey' | |
], capture_output=True, text=True) | |
if result.returncode == 0 and result.stdout.strip(): | |
# Extract public key and format for DNS | |
dkim_key = result.stdout.strip() | |
if 'zimbraDKIMPublicKey:' in dkim_key: | |
key_data = dkim_key.split('zimbraDKIMPublicKey:')[1].strip() | |
# Format as DNS TXT record | |
return f"v=DKIM1; k=rsa; p={key_data}" | |
# If no DKIM key exists, try to enable DKIM signing and generate | |
print("⚠️ No DKIM key found, enabling DKIM signing...") | |
# First enable DKIM signing for the domain | |
enable_result = subprocess.run([ | |
'su', '-', 'zimbra', '-c', | |
f'zmprov md {domain} zimbraDKIMSigningEnabled TRUE' | |
], capture_output=True, text=True) | |
if enable_result.returncode == 0: | |
print("✓ DKIM signing enabled") | |
# Generate DKIM key using correct method | |
gen_result = subprocess.run([ | |
'su', '-', 'zimbra', '-c', | |
f'/opt/zimbra/libexec/zmdkimkeyutil -a -d {domain} -s zimbra' | |
], capture_output=True, text=True) | |
if gen_result.returncode == 0: | |
print("✓ DKIM key generated, retrieving...") | |
# Try again to get the key | |
result = subprocess.run([ | |
'su', '-', 'zimbra', '-c', | |
f'zmprov gd {domain} zimbraDKIMPublicKey' | |
], capture_output=True, text=True) | |
if result.returncode == 0 and result.stdout.strip(): | |
dkim_key = result.stdout.strip() | |
if 'zimbraDKIMPublicKey:' in dkim_key: | |
key_data = dkim_key.split('zimbraDKIMPublicKey:')[1].strip() | |
return f"v=DKIM1; k=rsa; p={key_data}" | |
else: | |
print(f"⚠️ DKIM key generation failed: {gen_result.stderr}") | |
else: | |
print(f"⚠️ Failed to enable DKIM signing: {enable_result.stderr}") | |
return None | |
except Exception as e: | |
print(f"✗ Error getting Zimbra DKIM record: {e}") | |
return None | |
def trigger_domain_verification(self, sender_id: int) -> bool: | |
"""Trigger domain verification in Mailjet""" | |
print(f"Triggering verification for sender ID {sender_id}...") | |
try: | |
if MAILJET_AVAILABLE: | |
# Use official Mailjet library | |
result = self.mailjet.sender_validate.create(id=sender_id, data={}) | |
if result.status_code == 200: | |
print(f"✓ Domain verification triggered successfully") | |
return True | |
else: | |
print(f"✗ Failed to trigger verification: {result.status_code} - {result.json()}") | |
return False | |
else: | |
# Fallback to requests | |
response = requests.post( | |
f"https://api.mailjet.com/v3/REST/sender/{sender_id}/validate", | |
auth=self.mailjet_auth, | |
headers={'Content-Type': 'application/json'}, | |
json={} | |
) | |
if response.status_code == 200: | |
print(f"✓ Domain verification triggered successfully") | |
return True | |
else: | |
print(f"✗ Failed to trigger verification: {response.status_code} - {response.text}") | |
return False | |
except Exception as e: | |
print(f"✗ Error triggering verification: {e}") | |
return False | |
def add_domain_to_transport_maps(self, domain: str) -> bool: | |
"""Add domain to Zimbra sender transport maps""" | |
print(f"Adding {domain} to sender transport maps...") | |
try: | |
# Read current transport maps | |
with open('/opt/zimbra/conf/sender_transport_maps', 'r') as f: | |
current_maps = f.read() | |
# Check if domain already exists | |
domain_entry = f"@{domain} mailjet-api:" | |
if domain_entry in current_maps: | |
print(f"✓ Domain already in transport maps") | |
return True | |
# Add domain | |
with open('/opt/zimbra/conf/sender_transport_maps', 'a') as f: | |
f.write(f"{domain_entry}\n") | |
print(f"✓ Added domain to transport maps") | |
return True | |
except Exception as e: | |
print(f"✗ Error updating transport maps: {e}") | |
return False | |
def rebuild_transport_maps(self) -> bool: | |
"""Rebuild Postfix transport maps""" | |
print("Rebuilding transport maps...") | |
try: | |
import subprocess | |
# Change to zimbra user and rebuild maps | |
result = subprocess.run([ | |
'su', '-', 'zimbra', '-c', | |
'cd /opt/zimbra && postmap /opt/zimbra/conf/sender_transport_maps && postfix reload' | |
], capture_output=True, text=True) | |
if result.returncode == 0: | |
print("✓ Transport maps rebuilt successfully") | |
return True | |
else: | |
print(f"✗ Failed to rebuild transport maps: {result.stderr}") | |
return False | |
except Exception as e: | |
print(f"✗ Error rebuilding transport maps: {e}") | |
return False | |
def setup_mx_only(self, domain: str) -> bool: | |
"""Setup domain to use this server as primary mail server (MX + SPF only)""" | |
print(f"\n=== Setting up {domain} for direct mail server delivery ===") | |
success = True | |
server_hostname = "mail.latinaloveconnections.com" | |
# 1. MX Record pointing to our server | |
print(f"Setting MX record pointing to {server_hostname}") | |
if not self.add_godaddy_dns_record(domain, "MX", "@", server_hostname, priority=10): | |
success = False | |
# 2. SPF Record for this server only | |
spf_record = f"v=spf1 a:{server_hostname} mx ~all" | |
print(f"Setting SPF record: {spf_record}") | |
if not self.add_godaddy_dns_record(domain, "TXT", "@", spf_record): | |
success = False | |
# 3. Generate DKIM for Zimbra if needed | |
zimbra_dkim_record = self.get_zimbra_dkim_record(domain) | |
if zimbra_dkim_record: | |
print(f"Setting Zimbra DKIM record: mail._domainkey") | |
if not self.add_godaddy_dns_record(domain, "TXT", "mail._domainkey", zimbra_dkim_record): | |
success = False | |
else: | |
print("⚠️ Could not retrieve/generate Zimbra DKIM record") | |
# 4. DMARC policy - Check existing first | |
existing_dmarc = self.get_existing_dmarc(domain) | |
if existing_dmarc: | |
print(f"⚠️ Existing DMARC policy found: {existing_dmarc}") | |
print("🔄 Updating DMARC for direct mail server...") | |
# Basic policy for direct server | |
dmarc_record = f"v=DMARC1; p=quarantine; rua=mailto:dmarc@{domain}; fo=1; adkim=s; aspf=s" | |
else: | |
dmarc_record = f"v=DMARC1; p=quarantine; rua=mailto:dmarc@{domain}; fo=1; adkim=s; aspf=s" | |
print(f"Setting DMARC record: {dmarc_record}") | |
if not self.add_godaddy_dns_record(domain, "TXT", "_dmarc", dmarc_record, replace_existing=True): | |
success = False | |
if success: | |
print(f"✓ Domain {domain} configured for direct mail server delivery!") | |
print(f"📧 Create mailboxes in Zimbra for {domain} users") | |
print(f"🔍 Test: dig MX {domain}") | |
else: | |
print(f"✗ Failed to configure {domain} for mail server") | |
return success | |
def process_mailjet_domain(self, domain: str) -> bool: | |
"""Complete Mailjet domain setup process""" | |
print(f"\n=== Setting up {domain} for Mailjet delivery ===") | |
# Step 1: Add domain to Mailjet | |
sender_data = self.add_domain_to_mailjet(domain) | |
if not sender_data: | |
return False | |
sender_id = sender_data['ID'] | |
# Step 2: Get verification info | |
verification_info = self.get_domain_verification_info(sender_id) | |
if not verification_info: | |
return False | |
# Step 3: Setup DNS records | |
if not self.setup_mailjet_dns_records(domain, verification_info['dns']): | |
print("✗ Failed to setup DNS records") | |
return False | |
# Step 4: Wait for DNS propagation | |
print("⏳ Waiting 30 seconds for DNS propagation...") | |
time.sleep(30) | |
# Step 5: Trigger verification | |
if not self.trigger_domain_verification(sender_id): | |
return False | |
# Step 6: Add to transport maps | |
if not self.add_domain_to_transport_maps(domain): | |
return False | |
# Step 7: Rebuild transport maps | |
if not self.rebuild_transport_maps(): | |
return False | |
print(f"✓ Domain {domain} setup completed for Mailjet delivery!") | |
print("\nNext steps:") | |
print("1. Check Mailjet dashboard for verification status") | |
print("2. Test email delivery from the new domain") | |
print("3. Monitor logs for any issues") | |
return True | |
def get_godaddy_dns_records(self, domain: str) -> Optional[List[Dict]]: | |
"""Get all DNS records for domain from GoDaddy""" | |
try: | |
response = requests.get( | |
f"{GODADDY_BASE_URL}/domains/{domain}/records", | |
headers=self.godaddy_headers | |
) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
print(f"✗ Failed to get DNS records: {response.status_code} - {response.text}") | |
return None | |
except Exception as e: | |
print(f"✗ Error getting DNS records: {e}") | |
return None | |
def check_mailjet_domain_status(self, domain: str) -> Optional[Dict]: | |
"""Check if domain exists in Mailjet and get status""" | |
try: | |
if MAILJET_AVAILABLE: | |
# Search for senders with this domain | |
result = self.mailjet.sender.get(filters={'Email': f'*@{domain}'}) | |
if result.status_code == 200: | |
senders = result.json()['Data'] | |
if senders: | |
return { | |
'exists': True, | |
'senders': senders, | |
'count': len(senders) | |
} | |
else: | |
return {'exists': False} | |
else: | |
return {'exists': False, 'error': f"API error: {result.status_code}"} | |
else: | |
# Fallback to requests | |
response = requests.get( | |
"https://api.mailjet.com/v3/REST/sender", | |
auth=self.mailjet_auth, | |
params={'Email': f'*@{domain}'} | |
) | |
if response.status_code == 200: | |
senders = response.json()['Data'] | |
if senders: | |
return { | |
'exists': True, | |
'senders': senders, | |
'count': len(senders) | |
} | |
else: | |
return {'exists': False} | |
else: | |
return {'exists': False, 'error': f"API error: {response.status_code}"} | |
except Exception as e: | |
return {'exists': False, 'error': str(e)} | |
def display_domain_info(self, domain: str) -> bool: | |
"""Display comprehensive domain information in table format""" | |
print(f"\n{'='*60}") | |
print(f"DOMAIN INFORMATION: {domain}") | |
print(f"{'='*60}") | |
# Get GoDaddy DNS records | |
print("📋 Fetching DNS records from GoDaddy...") | |
dns_records = self.get_godaddy_dns_records(domain) | |
if not dns_records: | |
print("✗ Could not retrieve DNS records") | |
return False | |
# Check Mailjet status | |
print("📧 Checking Mailjet status...") | |
mailjet_status = self.check_mailjet_domain_status(domain) | |
# Display DNS Records Table | |
print(f"\n🌐 DNS RECORDS ({len(dns_records)} total)") | |
print("-" * 80) | |
print(f"{'TYPE':<8} {'NAME':<25} {'VALUE':<35} {'TTL':<8}") | |
print("-" * 80) | |
# Group records by type for better readability | |
record_types = {} | |
for record in dns_records: | |
record_type = record.get('type', 'UNKNOWN') | |
if record_type not in record_types: | |
record_types[record_type] = [] | |
record_types[record_type].append(record) | |
# Display records grouped by type | |
for record_type in sorted(record_types.keys()): | |
records = record_types[record_type] | |
for i, record in enumerate(records): | |
name = record.get('name', '@') | |
value = record.get('data', '') | |
ttl = record.get('ttl', '') | |
# Truncate long values | |
if len(value) > 35: | |
value = value[:32] + "..." | |
# Add spacing between record types | |
if i == 0 and record_type != sorted(record_types.keys())[0]: | |
print("-" * 80) | |
print(f"{record_type:<8} {name:<25} {value:<35} {ttl:<8}") | |
# Display Mailjet Status | |
print(f"\n📧 MAILJET STATUS") | |
print("-" * 40) | |
if mailjet_status.get('exists'): | |
print(f"✅ Domain registered in Mailjet") | |
print(f" Senders: {mailjet_status.get('count', 0)}") | |
for sender in mailjet_status.get('senders', []): | |
status = sender.get('Status', 'Unknown') | |
email = sender.get('Email', 'Unknown') | |
print(f" - {email}: {status}") | |
else: | |
print(f"❌ Domain not found in Mailjet") | |
if 'error' in mailjet_status: | |
print(f" Error: {mailjet_status['error']}") | |
# Check transport maps | |
print(f"\n🚛 TRANSPORT CONFIGURATION") | |
print("-" * 40) | |
try: | |
with open('/opt/zimbra/conf/sender_transport_maps', 'r') as f: | |
transport_maps = f.read() | |
domain_entry = f"@{domain} mailjet-api:" | |
if domain_entry in transport_maps: | |
print(f"✅ Domain configured for Mailjet transport") | |
else: | |
print(f"❌ Domain not in transport maps") | |
except Exception as e: | |
print(f"⚠️ Could not check transport maps: {e}") | |
# Analyze email configuration | |
print(f"\n🔍 EMAIL CONFIGURATION ANALYSIS") | |
print("-" * 40) | |
# Check MX records | |
mx_records = [r for r in dns_records if r.get('type') == 'MX'] | |
if mx_records: | |
print(f"📬 MX Records found:") | |
for mx in mx_records: | |
print(f" {mx.get('data', 'Unknown')}") | |
else: | |
print(f"⚠️ No MX records found") | |
# Check SPF | |
spf_records = [r for r in dns_records if r.get('type') == 'TXT' and 'v=spf1' in r.get('data', '')] | |
if spf_records: | |
print(f"🛡️ SPF Record found:") | |
for spf in spf_records: | |
spf_data = spf.get('data', '') | |
if 'include:spf.mailjet.com' in spf_data: | |
print(f" ✅ Includes Mailjet SPF") | |
if 'mail.latinaloveconnections.com' in spf_data: | |
print(f" ✅ Includes this server") | |
print(f" {spf_data}") | |
else: | |
print(f"⚠️ No SPF record found") | |
# Check DKIM | |
dkim_records = [r for r in dns_records if r.get('type') == 'TXT' and 'v=DKIM1' in r.get('data', '')] | |
if dkim_records: | |
print(f"🔐 DKIM Records found: {len(dkim_records)}") | |
for dkim in dkim_records: | |
name = dkim.get('name', '') | |
if 'mailjet' in name or 'mj' in name: | |
print(f" ✅ Mailjet DKIM: {name}") | |
elif 'zimbra' in name or 'mail' in name: | |
print(f" ✅ Server DKIM: {name}") | |
else: | |
print(f" 🔐 DKIM: {name}") | |
else: | |
print(f"⚠️ No DKIM records found") | |
# Check DMARC | |
dmarc_records = [r for r in dns_records if r.get('type') == 'TXT' and r.get('name') == '_dmarc'] | |
if dmarc_records: | |
print(f"🔒 DMARC Record found:") | |
for dmarc in dmarc_records: | |
print(f" {dmarc.get('data', '')}") | |
else: | |
print(f"⚠️ No DMARC record found") | |
print(f"\n{'='*60}") | |
return True | |
def get_existing_dmarc(self, domain: str) -> Optional[str]: | |
"""Get existing DMARC record from GoDaddy""" | |
try: | |
response = requests.get( | |
f"{GODADDY_BASE_URL}/domains/{domain}/records/TXT/_dmarc", | |
headers=self.godaddy_headers | |
) | |
if response.status_code == 200: | |
records = response.json() | |
if records and len(records) > 0: | |
return records[0].get('data', '') | |
return None | |
except Exception as e: | |
print(f"⚠️ Could not check existing DMARC: {e}") | |
return None | |
def main(): | |
parser = argparse.ArgumentParser( | |
description='Domain Management Script for Mailjet and GoDaddy Integration', | |
epilog=""" | |
Examples: | |
python3 domain-manager.py --setup-mx amor.global | |
Set up amor.global to use this server as primary mail server | |
python3 domain-manager.py --setup-mailjet amor.global | |
Set up amor.global for Mailjet delivery (with transport) | |
python3 domain-manager.py amor.global | |
Legacy mode - same as --setup-mailjet | |
""", | |
formatter_class=argparse.RawDescriptionHelpFormatter | |
) | |
parser.add_argument('domain', help='Domain name to configure') | |
parser.add_argument('--setup-mx', action='store_true', | |
help='Setup domain to use this server as primary mail server (MX + SPF + DKIM + DMARC)') | |
parser.add_argument('--setup-mailjet', action='store_true', | |
help='Setup domain for Mailjet delivery (adds to Mailjet + DNS + transport maps)') | |
args = parser.parse_args() | |
domain = args.domain.strip() | |
# Validate domain format | |
if not domain or '.' not in domain: | |
print("✗ Invalid domain format") | |
sys.exit(1) | |
manager = DomainManager() | |
# Determine operation mode | |
if args.setup_mx and args.setup_mailjet: | |
print("✗ Cannot use both --setup-mx and --setup-mailjet. Choose one.") | |
sys.exit(1) | |
elif args.setup_mx: | |
# MX mode - setup this server as primary mail server | |
if manager.setup_mx_only(domain): | |
print(f"\n🎉 Successfully configured {domain} for direct mail server delivery!") | |
else: | |
print(f"\n❌ Failed to configure {domain} for mail server") | |
sys.exit(1) | |
elif args.setup_mailjet: | |
# Mailjet mode - setup for Mailjet delivery | |
if manager.process_mailjet_domain(domain): | |
print(f"\n🎉 Successfully configured {domain} for Mailjet delivery!") | |
else: | |
print(f"\n❌ Failed to configure {domain} for Mailjet") | |
sys.exit(1) | |
else: | |
# No flags - show domain information | |
if manager.display_domain_info(domain): | |
print(f"\n💡 Use --setup-mx or --setup-mailjet to configure this domain") | |
else: | |
print(f"\n❌ Failed to retrieve domain information") | |
sys.exit(1) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment