Skip to content

Instantly share code, notes, and snippets.

@manuelep
Created May 18, 2026 14:52
Show Gist options
  • Select an option

  • Save manuelep/9b5ce3c3a92626d592be6969cc02e460 to your computer and use it in GitHub Desktop.

Select an option

Save manuelep/9b5ce3c3a92626d592be6969cc02e460 to your computer and use it in GitHub Desktop.
mmigrating tool from Mariadb to PostgreSQL

Migration Tool - MariaDB → PostgreSQL via SSH

Script Python avanzato per migrare dati da MariaDB a PostgreSQL attraverso un tunnel SSH, con support per field mapping, key-based upsert (INSERT/UPDATE) e dry-run mode.

📋 Requisiti

pip install mysql-connector-python psycopg2-binary paramiko sshtunnel

🚀 Quick Start

1. Genera configurazione template

python migrate_db.py --create-config config.json

Questo crea un file config.json con i valori di default da compilare.

2. Edita config.json con le tue credenziali

{
  "source": {
    "host": "mysql.example.com",
    "port": 3306,
    "user": "mysql_user",
    "password": "mysql_password",
    "database": "source_db",
    "table": "users",
    "fields": ["id", "email", "name", "surname"]
  },
  "ssh": {
    "enabled": true,
    "host": "bastion.example.com",
    "port": 22,
    "username": "ssh_user",
    "password": null,
    "key_file": "/path/to/private_key.pem",
    "key_passphrase": null
  },
  "destination": {
    "host": "postgres.internal.example.com",
    "port": 5432,
    "user": "postgres_user",
    "password": "postgres_password",
    "database": "destination_db",
    "table": "users",
    "key_field": "email",
    "field_mapping": {
      "id": "user_id",
      "email": "email",
      "name": "first_name",
      "surname": "last_name"
    }
  },
  "dry_run": true,
  "batch_size": 100
}

3. Esegui in modalità DRY-RUN (consigliato prima)

python migrate_db.py --config config.json --dry-run

Questo non modifica nulla e genera un report nel log con:

  • ✓ Chiavi trovate in destination (verranno aggiornate)
  • ✗ Chiavi nuove (verranno inserite)
  • Valori di mapping per il primo record
  • Statistiche complesse

Esempio output dry-run:

================================================================================
INIZIO MIGRAZIONE - Modalità: DRY-RUN
================================================================================
✓ Letti 1250 record da source
✓ Trovate 823 chiavi esistenti in destination

[DRY-RUN 1/1250] INSERT record chiave='user1@example.com'
[DRY-RUN 2/1250] UPDATE record chiave='existing@example.com'
[DRY-RUN 3/1250] INSERT record chiave='user3@example.com'
...

================================================================================
SUMMARY MIGRAZIONE
================================================================================
Record letti:    1250
Record inseriti: 427
Record aggiornati: 823
Record saltati:  0
Errori:          0
================================================================================

Chiavi INSERT: 427
Chiavi UPDATE: 823

DETTAGLIO CHIAVI (DRY-RUN):
  existing@example.com: ✓ ESISTE → UPDATE
  user1@example.com: ✗ NUOVO → INSERT
  user3@example.com: ✗ NUOVO → INSERT
  ...

4. Esegui migrazione reale

Una volta soddisfatto del dry-run:

python migrate_db.py --config config.json

Oppure rimuovi la flag "dry_run": true dal config e esegui senza --dry-run.

🔧 Configurazione Dettagliata

source (MariaDB)

Campo Descrizione Default
host Hostname MariaDB localhost
port Porta MariaDB 3306
user Username MySQL root
password Password MySQL password
database Nome database source source_db
table Nome tabella source source_table
fields Campi da esportare ([] = tutti) []

Esempio: esportare solo alcuni campi

"fields": ["id", "email", "name", "phone"]

ssh (Tunnel SSH)

Campo Descrizione
enabled Abilita tunnel SSH (true/false)
host Hostname del server SSH
port Porta SSH
username Username SSH
password Password SSH (null se usi chiave)
key_file Path alla chiave privata SSH
key_passphrase Passphrase della chiave (se encrypted)

Opzioni di autenticazione SSH:

{
  "enabled": true,
  "host": "bastion.example.com",
  "port": 22,
  "username": "ssh_user",
  "password": "ssh_password",
  "key_file": null,
  "key_passphrase": null
}

Oppure con chiave privata:

{
  "enabled": true,
  "host": "bastion.example.com",
  "port": 22,
  "username": "ssh_user",
  "password": null,
  "key_file": "/home/user/.ssh/id_rsa",
  "key_passphrase": "passphrase_della_chiave"
}

Se PostgreSQL è accessibile direttamente (no tunnel):

{
  "enabled": false
}

destination (PostgreSQL)

Campo Descrizione
host Hostname PostgreSQL
port Porta PostgreSQL
user Username PostgreSQL
password Password PostgreSQL
database Database destination
table Tabella destination
key_field Campo chiave per upsert (es. email, id)
field_mapping Mappa source → destination

Field Mapping:

Mappa i nomi dei campi source ai nomi in destination. Se omesso, assume stessi nomi:

"field_mapping": {
  "id": "user_id",
  "email": "email_address",
  "name": "first_name",
  "surname": "last_name"
}

Upsert Logic (INSERT vs UPDATE)

Lo script usa il key_field per determinare se fare INSERT o UPDATE:

  1. Legge il valore del key_field dal record source
  2. Verifica se esiste già in destination con quel valore di key_field
  3. Se esiste → UPDATE (aggiorna tutti i campi)
  4. Se non esiste → INSERT (inserisce nuovo record) OPPURE SKIP (se skip_insert: true)

Esempio (skip_insert: false - default):

Source record: {id: 123, email: 'john@example.com', name: 'John', surname: 'Doe'}
key_field: 'email'
key_value: 'john@example.com'

→ Se 'john@example.com' esiste in destination → UPDATE
→ Se 'john@example.com' NON esiste → INSERT (nuovo record)

Esempio (skip_insert: true - solo UPDATE):

Source record: {id: 123, email: 'john@example.com', name: 'John', surname: 'Doe'}
key_field: 'email'
key_value: 'john@example.com'

→ Se 'john@example.com' esiste in destination → UPDATE
→ Se 'john@example.com' NON esiste → SKIP (non inserito)

skip_insert (boolean)

Se true, salta gli INSERT - inserisce solo record su chiavi esatte che già esistono.

Uso case: Quando vuoi sincronizzare dati senza aggiungere nuovi record in destination.

"skip_insert": true

Default: false (abilita sia INSERT che UPDATE)

📊 Log e Report

I log sono salvati in migration.log (o file specificato con --log-file).

Ogni run contiene:

  • Connection logs: dettagli connessioni source/destination/SSH
  • Row-by-row logs: action (INSERT/UPDATE) per ogni record
  • Summary: statistiche finali
  • Key details: elenco di chiavi con azione prevista

Specifica file di log custom:

python migrate_db.py --config config.json --log-file migration_2024_01_15.log

🎯 Casi d'Uso

Caso 1: Sincronizzazione Utenti

Source: users table in MariaDB Destination: accounts table in PostgreSQL Key field: email (univoco per utente)

{
  "source": {
    "table": "users",
    "fields": ["email", "first_name", "last_name", "phone"]
  },
  "destination": {
    "table": "accounts",
    "key_field": "email",
    "field_mapping": {
      "email": "email_address",
      "first_name": "fname",
      "last_name": "lname"
    }
  }
}

Caso 2: Migrazione Prodotti con ID

Source: products in MariaDB Destination: products in PostgreSQL Key field: product_id (ID univoco prodotto)

{
  "source": {
    "table": "products",
    "fields": ["product_id", "name", "price", "stock"]
  },
  "destination": {
    "table": "products",
    "key_field": "product_id",
    "field_mapping": {
      "product_id": "id",
      "name": "title"
    }
  }
}

Caso 3: Update-only (senza INSERT)

Source: users in MariaDB Destination: accounts in PostgreSQL Key field: email Solo aggiornamenti, nessun nuovo record

{
  "source": {
    "table": "users",
    "fields": ["email", "last_name", "phone", "updated_at"]
  },
  "destination": {
    "table": "accounts",
    "key_field": "email",
    "field_mapping": {
      "email": "email_address",
      "last_name": "surname"
    }
  },
  "skip_insert": true
}

Con questa config:

  • Solo i record con email già presente in accounts vengono aggiornati
  • I record source con email non trovata vengono saltati

⚠️ Note Importanti

  1. Dry-run sempre prima: Sempre esegui --dry-run prima della migrazione reale
  2. Backup: Fai backup di destination prima di migrare in live
  3. Transazioni: Ogni batch (default 100 righe) viene committed separatamente
  4. Null values: NULL sono preservati nella migrazione
  5. Type conversion: I tipi vengono convertiti automaticamente (MySQL types → PostgreSQL types)
  6. SSH key permissions: La chiave SSH deve avere permessi 600 (chmod 600 ~/.ssh/id_rsa)

🐛 Troubleshooting

Errore: "SSH key file not found"

Verifica il path della chiave privata. Deve essere path assoluto:
❌ "key_file": "~/.ssh/id_rsa"
✅ "key_file": "/home/username/.ssh/id_rsa"

Errore: "Connection refused" su SSH

Verifica che:
- Host SSH sia raggiungibile
- Porta 22 sia aperta (o la port custom configurata)
- Username e password/key siano corretti

Errore: "Column does not exist"

Verifica che:
- Il campo destinazione esista nella tabella
- Il field_mapping sia corretto
- Gli accenti/maiuscole siano coerenti (PostgreSQL è case-sensitive)

Errore: "Foreign key constraint"

Se destination ha FK constraints, potrebbero conflittare con l'upsert. Soluzioni:

  1. Disabilita FK temporaneamente: SET CONSTRAINTS ALL DEFERRED;
  2. Ordina i dati per rispettare le FK
  3. Usa --dry-run per analizzare gli errori

📝 Esempi Completi

Migrazione di test (localhost)

# Gen config
python migrate_db.py --create-config test_config.json

# Edita test_config.json:
# - source: localhost MySQL con user=root, db=test_source, table=users
# - destination: localhost PostgreSQL con user=postgres, db=test_dest, table=users
# - ssh: disabled

# Dry-run
python migrate_db.py --config test_config.json --dry-run

# Real migration
python migrate_db.py --config test_config.json

Migrazione in produzione con SSH

# Gen config
python migrate_db.py --create-config prod_config.json

# Edita prod_config.json con credenziali reali

# Dry-run (non modifica nulla)
python migrate_db.py --config prod_config.json --dry-run

# Verifica il log
tail -f migration.log

# Se soddisfatto, esegui live (modifica il config: "dry_run": false)
python migrate_db.py --config prod_config.json --log-file migration_prod_$(date +%Y%m%d_%H%M%S).log

📞 Support

Errori comuni e soluzioni:

  • PSQLError: Verifica credenziali PostgreSQL e SSH tunnel
  • MySQLError: Verifica credenziali MariaDB e che la tabella/campi esistano
  • paramiko errors: Verifica SSH key permissions e formato

Creato per Manuele @ Gter srl - 2024

#!/usr/bin/env python3
"""
Script di migrazione dati da MariaDB a PostgreSQL via SSH tunnel.
Supporta field mapping, key-based upsert (INSERT/UPDATE) e dry-run.
Usa:
python migrate_db.py --config config.json --dry-run
python migrate_db.py --config config.json
"""
import argparse
import json
import sys
import logging
from datetime import datetime
from pathlib import Path
from typing import Optional, Dict, List, Any, Tuple
from dataclasses import dataclass, asdict
import mysql.connector
from mysql.connector import Error as MySQLError
import psycopg2
from psycopg2 import sql
import paramiko
from paramiko import SSHClient, AutoAddPolicy
from sshtunnel import SSHTunnelForwarder
# ============================================================================
# LOGGING SETUP
# ============================================================================
def setup_logging(log_file: str = "migration.log") -> logging.Logger:
"""Configura il logging su file e console."""
logger = logging.getLogger("MigrationTool")
logger.setLevel(logging.DEBUG)
# Handler file
fh = logging.FileHandler(log_file, encoding='utf-8')
fh.setLevel(logging.DEBUG)
# Handler console
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.INFO)
# Formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
fh.setFormatter(formatter)
ch.setFormatter(formatter)
logger.addHandler(fh)
logger.addHandler(ch)
return logger
# ============================================================================
# DATA CLASSES
# ============================================================================
@dataclass
class SourceConfig:
"""Configurazione source MariaDB."""
host: str = "localhost"
port: int = 3306
user: str = "root"
password: str = "password"
database: str = "source_db"
table: str = "source_table"
fields: List[str] = None # ['id', 'email', 'name']
def __post_init__(self):
if self.fields is None:
self.fields = []
@dataclass
class SSHConfig:
"""Configurazione SSH tunnel."""
enabled: bool = True
host: str = "ssh.example.com"
port: int = 22
username: str = "sshuser"
password: Optional[str] = None
key_file: Optional[str] = None # Path to private key
key_passphrase: Optional[str] = None
@dataclass
class DestinationConfig:
"""Configurazione destination PostgreSQL."""
host: str = "localhost"
port: int = 5432
user: str = "postgres"
password: str = "password"
database: str = "destination_db"
table: str = "destination_table"
key_field: str = "email" # Campo di riconoscimento per upsert
# Field mapping: source_field -> destination_field
field_mapping: Dict[str, str] = None # {'email': 'user_email', 'name': 'full_name'}
def __post_init__(self):
if self.field_mapping is None:
self.field_mapping = {}
@dataclass
class MigrationConfig:
"""Configurazione completa della migrazione."""
source: SourceConfig
ssh: SSHConfig
destination: DestinationConfig
dry_run: bool = True
batch_size: int = 100
skip_insert: bool = False # Se True, salta gli INSERT (solo UPDATE)
@classmethod
def from_json(cls, filepath: str) -> 'MigrationConfig':
"""Carica configurazione da file JSON."""
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
source = SourceConfig(**data.get('source', {}))
ssh = SSHConfig(**data.get('ssh', {}))
destination = DestinationConfig(**data.get('destination', {}))
return cls(
source=source,
ssh=ssh,
destination=destination,
dry_run=data.get('dry_run', True),
batch_size=data.get('batch_size', 100),
skip_insert=data.get('skip_insert', False)
)
def to_json(self, filepath: str) -> None:
"""Salva configurazione su file JSON."""
data = {
'source': asdict(self.source),
'ssh': asdict(self.ssh),
'destination': asdict(self.destination),
'dry_run': self.dry_run,
'batch_size': self.batch_size,
'skip_insert': self.skip_insert
}
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
# ============================================================================
# MIGRATION ENGINE
# ============================================================================
class DatabaseMigrator:
"""Engine principale per la migrazione dati."""
def __init__(self, config: MigrationConfig, logger: logging.Logger):
self.config = config
self.logger = logger
self.source_conn = None
self.dest_conn = None
self.ssh_tunnel = None
self.stats = {
'rows_read': 0,
'rows_inserted': 0,
'rows_updated': 0,
'rows_skipped': 0,
'errors': 0,
'key_matches': {}, # {key_value: {'found': bool, 'action': 'INSERT|UPDATE'}}
}
def connect_source(self) -> bool:
"""Connette al database source MariaDB."""
try:
self.logger.info(f"Connessione a MariaDB: {self.config.source.host}:{self.config.source.port}/{self.config.source.database}")
self.source_conn = mysql.connector.connect(
host=self.config.source.host,
port=self.config.source.port,
user=self.config.source.user,
password=self.config.source.password,
database=self.config.source.database
)
self.logger.info("✓ Connesso a MariaDB")
return True
except MySQLError as e:
self.logger.error(f"✗ Errore di connessione MariaDB: {e}")
return False
def connect_destination(self) -> bool:
"""Connette al database destination PostgreSQL (eventualmente via SSH tunnel)."""
try:
# Configura SSH tunnel se necessario
if self.config.ssh.enabled:
self.logger.info(f"Configurazione SSH tunnel: {self.config.ssh.host}:{self.config.ssh.port}")
# Determina il metodo di autenticazione SSH
ssh_auth = {}
if self.config.ssh.key_file:
ssh_auth['ssh_pkey'] = paramiko.RSAKey.from_private_key_file(
self.config.ssh.key_file,
password=self.config.ssh.key_passphrase
)
self.logger.debug(f"Usato SSH key: {self.config.ssh.key_file}")
else:
ssh_auth['ssh_password'] = self.config.ssh.password
self.logger.debug("Usata SSH password")
# Avvia tunnel
self.ssh_tunnel = SSHTunnelForwarder(
(self.config.ssh.host, self.config.ssh.port),
ssh_username=self.config.ssh.username,
**ssh_auth,
remote_bind_address=(self.config.destination.host, self.config.destination.port),
allow_agent=True,
look_for_keys=True
)
self.ssh_tunnel.start()
# PostgreSQL si connette al tunnel
pg_host = '127.0.0.1'
pg_port = self.ssh_tunnel.local_bind_port
self.logger.info(f"✓ SSH tunnel stabilito: localhost:{pg_port} -> {self.config.destination.host}:{self.config.destination.port}")
else:
pg_host = self.config.destination.host
pg_port = self.config.destination.port
self.logger.info(f"Connessione diretta a PostgreSQL (SSH disabilitato)")
# Connessione PostgreSQL
self.logger.info(f"Connessione a PostgreSQL: {pg_host}:{pg_port}/{self.config.destination.database}")
self.dest_conn = psycopg2.connect(
host=pg_host,
port=pg_port,
user=self.config.destination.user,
password=self.config.destination.password,
database=self.config.destination.database
)
self.logger.info("✓ Connesso a PostgreSQL")
return True
except Exception as e:
self.logger.error(f"✗ Errore di connessione PostgreSQL/SSH: {e}")
return False
def disconnect(self) -> None:
"""Chiude tutte le connessioni."""
if self.source_conn:
self.source_conn.close()
self.logger.debug("✓ Disconnesso da MariaDB")
if self.dest_conn:
self.dest_conn.close()
self.logger.debug("✓ Disconnesso da PostgreSQL")
if self.ssh_tunnel:
self.ssh_tunnel.stop()
self.logger.debug("✓ SSH tunnel chiuso")
def fetch_source_data(self) -> List[Dict[str, Any]]:
"""Legge dati da MariaDB."""
try:
cursor = self.source_conn.cursor(dictionary=True)
# Se fields specifici richiesti, filtra
if self.config.source.fields:
fields_str = ', '.join(f"`{f}`" for f in self.config.source.fields)
query = f"SELECT {fields_str} FROM `{self.config.source.table}`"
else:
query = f"SELECT * FROM `{self.config.source.table}`"
self.logger.info(f"Esecuzione query: {query}")
cursor.execute(query)
rows = cursor.fetchall()
self.logger.info(f"✓ Letti {len(rows)} record da source")
self.stats['rows_read'] = len(rows)
cursor.close()
return rows
except MySQLError as e:
self.logger.error(f"✗ Errore lettura source: {e}")
return []
def get_existing_keys(self) -> Dict[str, Any]:
"""Recupera i valori del campo chiave già presenti in destination."""
try:
cursor = self.dest_conn.cursor()
key_field = self.config.destination.key_field
# Mappa al nome reale in destination
dest_key_field = self.config.destination.field_mapping.get(key_field, key_field)
query = f"SELECT {sql.Identifier(dest_key_field).as_string(cursor)} FROM {sql.Identifier(self.config.destination.table).as_string(cursor)}"
self.logger.debug(f"Query existing keys: {query}")
cursor.execute(query)
keys = {row[0]: True for row in cursor.fetchall() if row[0] is not None}
self.logger.info(f"✓ Trovate {len(keys)} chiavi esistenti in destination")
cursor.close()
return keys
except psycopg2.Error as e:
self.logger.warning(f"⚠ Errore lettura chiavi esistenti: {e}")
return {}
def map_row(self, row: Dict[str, Any]) -> Tuple[Dict[str, Any], str]:
"""
Mappa un record source al format destination.
Returns:
(row_mappato, valore_chiave)
"""
key_field = self.config.destination.key_field
key_value = row.get(key_field)
# Applica field mapping
mapped_row = {}
for src_field, value in row.items():
dest_field = self.config.destination.field_mapping.get(src_field, src_field)
mapped_row[dest_field] = value
return mapped_row, key_value
def upsert_row(self, cursor, mapped_row: Dict[str, Any], key_value: str, key_exists: bool) -> str:
"""
Esegue INSERT o UPDATE a seconda se la chiave esiste.
Se skip_insert=True, salta gli INSERT (solo UPDATE).
Returns:
'INSERT' | 'UPDATE' | 'SKIP'
"""
key_field = self.config.destination.key_field
dest_key_field = self.config.destination.field_mapping.get(key_field, key_field)
table = self.config.destination.table
try:
if key_exists:
# UPDATE
set_clause = ', '.join([
f"{sql.Identifier(k).as_string(cursor)} = %s"
for k in mapped_row.keys()
if k != dest_key_field
])
values = [v for k, v in mapped_row.items() if k != dest_key_field]
values.append(key_value)
query = f"UPDATE {sql.Identifier(table).as_string(cursor)} SET {set_clause} WHERE {sql.Identifier(dest_key_field).as_string(cursor)} = %s"
cursor.execute(query, values)
self.stats['rows_updated'] += cursor.rowcount
return 'UPDATE'
else:
# INSERT (saltato se skip_insert=True)
if self.config.skip_insert:
self.logger.debug(f"Record chiave '{key_value}' non trovato, saltato (skip_insert=True)")
self.stats['rows_skipped'] += 1
return 'SKIP'
cols = list(mapped_row.keys())
placeholders = ', '.join(['%s'] * len(cols))
cols_str = ', '.join([sql.Identifier(c).as_string(cursor) for c in cols])
query = f"INSERT INTO {sql.Identifier(table).as_string(cursor)} ({cols_str}) VALUES ({placeholders})"
values = [mapped_row[c] for c in cols]
cursor.execute(query, values)
self.stats['rows_inserted'] += cursor.rowcount
return 'INSERT'
except psycopg2.Error as e:
self.logger.error(f"✗ Errore upsert chiave '{key_value}': {e}")
self.stats['errors'] += 1
return 'SKIP'
def run_migration(self) -> bool:
"""Esegue la migrazione (o dry-run)."""
if not self.connect_source():
return False
if not self.connect_destination():
self.disconnect()
return False
try:
self.logger.info("=" * 80)
self.logger.info(f"INIZIO MIGRAZIONE - Modalità: {'DRY-RUN' if self.config.dry_run else 'LIVE'}")
self.logger.info("=" * 80)
# Leggi dati source
source_rows = self.fetch_source_data()
if not source_rows:
self.logger.warning("⚠ Nessun record da migrare")
return True
# Recupera chiavi esistenti in destination
existing_keys = self.get_existing_keys()
# Prepara cursore destination
dest_cursor = self.dest_conn.cursor()
# Processa righe
self.logger.info(f"Inizio elaborazione {len(source_rows)} record...")
for i, row in enumerate(source_rows, 1):
mapped_row, key_value = self.map_row(row)
key_exists = key_value in existing_keys if key_value else False
# Log mapping
action = 'UPDATE' if key_exists else 'INSERT'
# Se skip_insert=True e chiave non esiste → azione diventa SKIP
if self.config.skip_insert and not key_exists:
action = 'SKIP'
self.logger.debug(f"[{i}] Chiave '{key_value}': {action}")
if key_value not in self.stats['key_matches']:
self.stats['key_matches'][key_value] = {
'found': key_exists,
'action': action
}
# Dry-run: solo log
if self.config.dry_run:
self.logger.info(f"[DRY-RUN {i}/{len(source_rows)}] {action} record chiave='{key_value}'")
if i == 1: # Log il primo record con i valori
self.logger.debug(f" Valori: {json.dumps(mapped_row, indent=2, default=str)}")
else:
# Live: esegui upsert
self.upsert_row(dest_cursor, mapped_row, key_value, key_exists)
if i % self.config.batch_size == 0:
self.dest_conn.commit()
self.logger.info(f"[{i}/{len(source_rows)}] Commit batch")
# Commit finale (se non dry-run)
if not self.config.dry_run:
self.dest_conn.commit()
self.logger.info("✓ Commit finale")
dest_cursor.close()
# Log summary
self.logger.info("=" * 80)
self.logger.info("SUMMARY MIGRAZIONE")
self.logger.info("=" * 80)
self.logger.info(f"Record letti: {self.stats['rows_read']}")
self.logger.info(f"Record inseriti: {self.stats['rows_inserted']}")
self.logger.info(f"Record aggiornati: {self.stats['rows_updated']}")
self.logger.info(f"Record saltati: {self.stats['rows_skipped']}")
self.logger.info(f"Errori: {self.stats['errors']}")
self.logger.info("=" * 80)
# Analisi chiavi
updates = sum(1 for v in self.stats['key_matches'].values() if v['action'] == 'UPDATE')
inserts = sum(1 for v in self.stats['key_matches'].values() if v['action'] == 'INSERT')
self.logger.info(f"Chiavi INSERT: {inserts}")
self.logger.info(f"Chiavi UPDATE: {updates}")
# Log dettaglio chiavi se dry-run
if self.config.dry_run:
self.logger.info("\nDETTAGLIO CHIAVI (DRY-RUN):")
for key_value, info in sorted(self.stats['key_matches'].items()):
status = "✓ ESISTE" if info['found'] else "✗ NUOVO"
action = info['action']
self.logger.info(f" {key_value}: {status}{action}")
return True
except Exception as e:
self.logger.error(f"✗ Errore fatale: {e}", exc_info=True)
return False
finally:
self.disconnect()
# ============================================================================
# CLI & MAIN
# ============================================================================
def create_default_config() -> MigrationConfig:
"""Crea una configurazione di default."""
return MigrationConfig(
source=SourceConfig(
host="mysql.example.com",
port=3306,
user="mysql_user",
password="mysql_password",
database="source_db",
table="users",
fields=["id", "email", "name", "surname"]
),
ssh=SSHConfig(
enabled=True,
host="bastion.example.com",
port=22,
username="ssh_user",
password=None,
key_file="/path/to/private_key.pem",
key_passphrase=None
),
destination=DestinationConfig(
host="postgres.internal.example.com",
port=5432,
user="postgres_user",
password="postgres_password",
database="destination_db",
table="users",
key_field="email",
field_mapping={
"id": "user_id",
"email": "email",
"name": "first_name",
"surname": "last_name"
}
),
dry_run=True,
batch_size=100,
skip_insert=False
)
def main():
parser = argparse.ArgumentParser(
description='Migrazione dati da MariaDB a PostgreSQL via SSH tunnel',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Esempi:
# Crea configurazione di default
python migrate_db.py --create-config config.json
# Esegui dry-run
python migrate_db.py --config config.json --dry-run
# Esegui migrazione reale
python migrate_db.py --config config.json
# Esegui con log esplicito
python migrate_db.py --config config.json --log-file migration_$(date +%Y%m%d_%H%M%S).log
"""
)
parser.add_argument('--config', help='Path al file di configurazione JSON')
parser.add_argument('--create-config', metavar='OUTPUT_FILE',
help='Crea un file di configurazione di default')
parser.add_argument('--dry-run', action='store_true',
help='Esegui in modalità dry-run (senza modificare destination)')
parser.add_argument('--log-file', default='migration.log',
help='Path file di log (default: migration.log)')
args = parser.parse_args()
# Setup logging
logger = setup_logging(args.log_file)
logger.info(f"Log salvato in: {args.log_file}")
# Crea configurazione di default
if args.create_config:
logger.info(f"Creazione configurazione di default: {args.create_config}")
config = create_default_config()
config.to_json(args.create_config)
logger.info(f"✓ File creato. Edita il file con le tue credenziali.")
print(f"\n✓ File di configurazione creato: {args.create_config}")
print(" Edita il file con le tue credenziali e poi esegui:")
print(f" python migrate_db.py --config {args.create_config} --dry-run")
return 0
# Valida che sia fornito config
if not args.config:
logger.error("✗ --config è obbligatorio (usa --create-config per generare un template)")
parser.print_help()
return 1
# Carica configurazione
if not Path(args.config).exists():
logger.error(f"✗ File di configurazione non trovato: {args.config}")
return 1
try:
config = MigrationConfig.from_json(args.config)
logger.info(f"✓ Configurazione caricata da {args.config}")
except Exception as e:
logger.error(f"✗ Errore caricamento configurazione: {e}")
return 1
# Override dry-run se specificato
if args.dry_run:
config.dry_run = True
# Esegui migrazione
migrator = DatabaseMigrator(config, logger)
success = migrator.run_migration()
return 0 if success else 1
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment