Skip to content

Instantly share code, notes, and snippets.

@overflowy
Created October 13, 2025 21:38
Show Gist options
  • Select an option

  • Save overflowy/0e24f100b8e2cf487da504ce29b93351 to your computer and use it in GitHub Desktop.

Select an option

Save overflowy/0e24f100b8e2cf487da504ce29b93351 to your computer and use it in GitHub Desktop.
# /// script
# dependencies = [
# "loguru>=0.7.3",
# "openpyxl>=3.1.5",
# "safe-result>=4.0.3",
# ]
# ///
import re
import shutil
import unicodedata
from pathlib import Path
import openpyxl
from loguru import logger
from safe_result import ok, safe, traceback_of
DATA_DIR = Path("Dati")
FILES_DIR = Path("DOCs")
ANAGRAFICHE_EXCEL_FILE = DATA_DIR / "Anagrafiche_Persone.xlsx"
FILES_EXCEL_FILE = DATA_DIR / "Documenti.xlsx"
OUTPUT_DIR = Path("OUTPUT_ANAGRAFICHE")
OUTPUT_LOG = Path("output_anagrafiche.log")
_INVALID_CHARS = re.compile(r'[<>:"/\\|?*\x00-\x1f]')
_MULTIPLE_UNDERSCORES = re.compile(r"_+")
def setup_script():
try:
if OUTPUT_LOG.exists():
OUTPUT_LOG.unlink()
except Exception:
pass
logger.remove(0)
logger.add(OUTPUT_LOG, level="DEBUG", format="<green>{level}</green>: {message}")
OUTPUT_DIR.mkdir(exist_ok=True, parents=True)
def sanitize_name(name, max_length=100, replacement="_"):
# Normalize unicode characters
name = unicodedata.normalize("NFKD", name)
name = name.encode("ASCII", "ignore").decode("ASCII")
# Remove or replace invalid characters
name = _INVALID_CHARS.sub(replacement, name)
# Remove leading/trailing spaces and dots
name = name.strip(". ")
# Replace multiple consecutive replacement characters
if replacement == "_":
name = _MULTIPLE_UNDERSCORES.sub(replacement, name)
else:
# For non-underscore replacements, compile pattern on-the-fly
# (less common case)
name = re.sub(f"{re.escape(replacement)}+", replacement, name)
# Remove leading/trailing replacement characters and spaces
name = name.strip(replacement).strip()
# Truncate to max length
if len(name) > max_length:
name = name[:max_length].rstrip(replacement)
return name
@safe
def prepare_files_list():
print("Preparing files list...")
files_list = {file.stem.upper(): file for file in FILES_DIR.glob("*.*")}
return files_list
@safe
def prepare_people_map():
print("Preparing people map...")
anagrafiche_wb = openpyxl.load_workbook(ANAGRAFICHE_EXCEL_FILE, read_only=True)
anagrafiche_ws = anagrafiche_wb.active
if not anagrafiche_ws:
raise ValueError("Anagrafiche_Persone.xlsx is empty or corrupted")
people_map = {}
headers = [cell.value for cell in anagrafiche_ws[1]]
id_idx = headers.index("Id")
nome_idx = headers.index("Nome")
cognome_idx = headers.index("Cognome")
tipo_persona_idx = headers.index("TipoPersona")
ragione_sociale_idx = headers.index("RagioneSociale")
codice_fiscale_idx = headers.index("CodiceFiscale")
partita_iva_idx = headers.index("PartitaIva")
for row in anagrafiche_ws.iter_rows(min_row=2, values_only=True):
people_map[row[id_idx]] = {
"nome": row[nome_idx],
"cognome": row[cognome_idx],
"tipo_persona": row[tipo_persona_idx],
"ragione_sociale": row[ragione_sociale_idx],
"codice_fiscale": row[codice_fiscale_idx],
"partita_iva": row[partita_iva_idx],
}
return people_map
@safe
def prepare_person_documents_map():
print("Preparing person -> documents map...")
documenti_wb = openpyxl.load_workbook(FILES_EXCEL_FILE, read_only=True)
documenti_ws = documenti_wb.active
if not documenti_ws:
raise ValueError("Documenti.xlsx is empty or corrupted")
person_documents_map = {}
headers = [cell.value for cell in documenti_ws[1]]
file_excel_idx = headers.index("FileExcel")
istanza_id_idx = headers.index("IstanzaId")
nome_doc_fisico_idx = headers.index("NomeDocFisico")
nome_doc_idx = headers.index("NomeDoc")
for row in documenti_ws.iter_rows(min_row=2, values_only=True):
# Only process documents linked to Anagrafiche_Persone
if row[file_excel_idx] != "Anagrafiche_Persone":
continue
person_id = row[istanza_id_idx]
nome_doc_fisico = row[nome_doc_fisico_idx]
nome_doc = row[nome_doc_idx]
if not person_id or not nome_doc_fisico or not nome_doc:
continue
if person_id not in person_documents_map:
person_documents_map[person_id] = []
person_documents_map[person_id].append(
{
"nome_doc_fisico": nome_doc_fisico,
"nome_doc": nome_doc,
}
)
return person_documents_map
def build_folder_name(person):
"""Build folder name from person details."""
tipo_persona = person.get("tipo_persona")
# Strip underscore prefix from tax codes
def clean_tax_code(code):
if not code:
return ""
return code.lstrip("_")
if tipo_persona == "F":
# Individual
nome = person.get("nome") or ""
cognome = person.get("cognome") or ""
codice_fiscale = clean_tax_code(person.get("codice_fiscale"))
if nome and cognome:
name_part = f"{nome} {cognome}"
elif cognome:
name_part = cognome
elif nome:
name_part = nome
else:
name_part = "Unknown"
if codice_fiscale:
folder_name = f"{name_part} - {codice_fiscale}"
else:
folder_name = name_part
else:
# Company (TipoPersona='G') or other
ragione_sociale = person.get("ragione_sociale") or "Unknown"
partita_iva = clean_tax_code(person.get("partita_iva"))
codice_fiscale = clean_tax_code(person.get("codice_fiscale"))
# Prefer PartitaIva, fallback to CodiceFiscale
tax_code = partita_iva or codice_fiscale
if tax_code:
folder_name = f"{ragione_sociale} - {tax_code}"
else:
folder_name = ragione_sociale
return sanitize_name(folder_name)
@safe
def process_file(file_id, origin_path, person_id, people_map, documents):
person = people_map.get(person_id)
if not person:
logger.warning(f"{file_id}: Person not found: {person_id}")
return
folder_name = build_folder_name(person)
# Find document details
doc = None
for d in documents:
if d["nome_doc_fisico"] == file_id:
doc = d
break
if not doc:
logger.warning(f"{file_id}: Document details not found")
return
file_name = sanitize_name(doc["nome_doc"])
if not file_name:
logger.warning(f"File {file_id} has no `nome_doc`, using {origin_path.name}")
file_name = origin_path.name
destination_path = OUTPUT_DIR / folder_name / file_name
destination_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy(origin_path, destination_path)
logger.info(f"{file_id}: OK")
def main():
setup_script()
print("Job started")
files_list = prepare_files_list()
if not ok(files_list):
logger.error(files_list)
return
people_map = prepare_people_map()
if not ok(people_map):
logger.error(people_map)
return
person_documents_map = prepare_person_documents_map()
if not ok(person_documents_map):
logger.error(person_documents_map)
return
# Count total files to process
total_files = sum(len(docs) for docs in person_documents_map.value.values())
processed_files = 0
for person_id, documents in person_documents_map.value.items():
for doc in documents:
file_id = doc["nome_doc_fisico"]
file_key = file_id.upper()
if file_key not in files_list.value:
logger.warning(f"{file_id}: File not found in {FILES_DIR}")
processed_files += 1
continue
file_path = files_list.value[file_key]
result = process_file(file_id, file_path, person_id, people_map.value, documents)
processed_files += 1
percentage = (processed_files / total_files) * 100
print(f"Copying files: {percentage:.2f}%", end="\r")
if not ok(result):
logger.error(f"{file_id}:\n{traceback_of(result)}")
continue
print("\nJob completed")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment