Last active
October 13, 2025 21:39
-
-
Save overflowy/5a9c9eab65a2034eefd07f41906daf1b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # dependencies = [ | |
| # "loguru>=0.7.3", | |
| # "openpyxl>=3.1.5", | |
| # "safe-result>=4.0.3", | |
| # ] | |
| # /// | |
| import re | |
| import shutil | |
| import unicodedata | |
| from pathlib import Path | |
| import openpyxl | |
| from loguru import logger | |
| from safe_result import ok, safe, traceback_of | |
| DATA_DIR = Path("Dati") | |
| FILES_DIR = Path("DOCs") | |
| FASCICOLI_EXCEL_FILE = DATA_DIR / "Fascicoli_repertori.xlsx" | |
| FILES_EXCEL_FILE = DATA_DIR / "Documenti.xlsx" | |
| OUTPUT_DIR = Path("OUTPUT") | |
| OUTPUT_LOG = Path("output.log") | |
| _INVALID_CHARS = re.compile(r'[<>:"/\\|?*\x00-\x1f]') | |
| _MULTIPLE_UNDERSCORES = re.compile(r"_+") | |
| def setup_script(): | |
| try: | |
| if OUTPUT_LOG.exists(): | |
| OUTPUT_LOG.unlink() | |
| except Exception: | |
| pass | |
| logger.remove(0) | |
| logger.add(OUTPUT_LOG, level="DEBUG", format="<green>{level}</green>: {message}") | |
| OUTPUT_DIR.mkdir(exist_ok=True, parents=True) | |
| def sanitize_name(name, max_length=100, replacement="_"): | |
| # Normalize unicode characters | |
| name = unicodedata.normalize("NFKD", name) | |
| name = name.encode("ASCII", "ignore").decode("ASCII") | |
| # Remove or replace invalid characters | |
| name = _INVALID_CHARS.sub(replacement, name) | |
| # Remove leading/trailing spaces and dots | |
| name = name.strip(". ") | |
| # Replace multiple consecutive replacement characters | |
| if replacement == "_": | |
| name = _MULTIPLE_UNDERSCORES.sub(replacement, name) | |
| else: | |
| # For non-underscore replacements, compile pattern on-the-fly | |
| # (less common case) | |
| name = re.sub(f"{re.escape(replacement)}+", replacement, name) | |
| # Remove leading/trailing replacement characters and spaces | |
| name = name.strip(replacement).strip() | |
| # Truncate to max length | |
| if len(name) > max_length: | |
| name = name[:max_length].rstrip(replacement) | |
| return name | |
| @safe | |
| def prepare_files_list(): | |
| print("Preparing files list...") | |
| files_list = {file.stem.upper(): file for file in FILES_DIR.glob("*.*")} | |
| return files_list | |
| @safe | |
| def prepare_fascicolo_details_map(): | |
| print("Preparing fascicolo -> details map...") | |
| fascicoli_wb = openpyxl.load_workbook(FASCICOLI_EXCEL_FILE, read_only=True) | |
| fascicoli_ws = fascicoli_wb.active | |
| if not fascicoli_ws: | |
| raise ValueError("Fascicoli_repertori.xlsx is empty or corrupted") | |
| fascicolo_details_map = {} | |
| headers = [cell.value for cell in fascicoli_ws[1]] | |
| id_idx = headers.index("ID") | |
| codice_idx = headers.index("codice") | |
| descrizione_idx = headers.index("Descrizione") | |
| for row in fascicoli_ws.iter_rows(min_row=2, values_only=True): | |
| fascicolo_details_map[row[id_idx]] = { | |
| "codice": row[codice_idx], | |
| "descrizione": row[descrizione_idx], | |
| } | |
| return fascicolo_details_map | |
| @safe | |
| def prepare_file_fasicolo_map(): | |
| print("Preparing file -> fascicolo map...") | |
| documenti_wb = openpyxl.load_workbook(FILES_EXCEL_FILE, read_only=True) | |
| documenti_ws = documenti_wb.active | |
| if not documenti_ws: | |
| raise ValueError("Documenti.xlsx is empty or corrupted") | |
| file_fascicolo_map = {} | |
| headers = [cell.value for cell in documenti_ws[1]] | |
| fascicolo_id_idx = headers.index("IstanzaId") | |
| nome_doc_fisico_idx = headers.index("NomeDocFisico") | |
| nome_doc_idx = headers.index("NomeDoc") | |
| is_folder_idx = headers.index("isFolder") | |
| cartella_idx = headers.index("Cartella") | |
| for row in documenti_ws.iter_rows(min_row=2, values_only=True): | |
| if row[nome_doc_fisico_idx] not in file_fascicolo_map: | |
| file_fascicolo_map[row[nome_doc_fisico_idx]] = { | |
| "fascicolo_id": row[fascicolo_id_idx], | |
| "nome_doc_fisico": row[nome_doc_fisico_idx], | |
| "nome_doc": row[nome_doc_idx], | |
| "is_folder": row[is_folder_idx], | |
| "cartella": row[cartella_idx], | |
| } | |
| continue | |
| # If isFolder is not undefined, overwrite the value with the new value | |
| if row[is_folder_idx] != "": | |
| file_fascicolo_map[row[nome_doc_fisico_idx]] = { | |
| "fascicolo_id": row[fascicolo_id_idx], | |
| "nome_doc_fisico": row[nome_doc_fisico_idx], | |
| "nome_doc": row[nome_doc_idx], | |
| "is_folder": row[is_folder_idx], | |
| "cartella": row[cartella_idx], | |
| } | |
| return file_fascicolo_map | |
| @safe | |
| def process_file(file_id, origin_path, file_details, fascicolo_details_map): | |
| details = file_details.get(file_id) | |
| if not details: | |
| logger.warning(f"{file_id}: File not referenced in {FILES_EXCEL_FILE}") | |
| return | |
| fascicolo_id = details.get("fascicolo_id") | |
| fasicolo_details = fascicolo_details_map.get(fascicolo_id) | |
| if not fasicolo_details: | |
| logger.warning( | |
| f"{file_id}: File fascicolo {fascicolo_id} not referenced in {FASCICOLI_EXCEL_FILE}" | |
| ) | |
| return | |
| fascicolo_codice = fasicolo_details["codice"] | |
| fascicolo_descrizione = fasicolo_details["descrizione"] | |
| fascicolo_descrizione = sanitize_name(fascicolo_descrizione) | |
| fascicolo_dir_name = f"{fascicolo_codice} - {fascicolo_descrizione}" | |
| sub_dir = details["cartella"] | |
| sub_dir = sanitize_name(sub_dir) | |
| file_name = details["nome_doc"] | |
| file_name = sanitize_name(file_name) | |
| if not file_name: | |
| logger.warning(f"File {file_id} has no `nome_doc`, using {origin_path.name}") | |
| file_name = origin_path.name | |
| destination_path = ( | |
| (OUTPUT_DIR / fascicolo_dir_name / sub_dir / f"{file_name}") | |
| if sub_dir | |
| else OUTPUT_DIR / fascicolo_dir_name / f"{file_name}" | |
| ) | |
| destination_path.parent.mkdir(parents=True, exist_ok=True) | |
| shutil.copy(origin_path, destination_path) | |
| logger.info(f"{file_id}: OK") | |
| def main(): | |
| setup_script() | |
| print("Job started") | |
| files_list = prepare_files_list() | |
| if not ok(files_list): | |
| logger.error(files_list) | |
| return | |
| fascicolo_details_map = prepare_fascicolo_details_map() | |
| if not ok(fascicolo_details_map): | |
| logger.error(fascicolo_details_map) | |
| return | |
| file_fascicolo_map = prepare_file_fasicolo_map() | |
| if not ok(file_fascicolo_map): | |
| logger.error(file_fascicolo_map) | |
| return | |
| total_files = len(files_list.value) | |
| print(f"Started copying {total_files} files") | |
| processed_files = 0 | |
| for file, file_path in files_list.value.items(): | |
| result = process_file( | |
| file, file_path, file_fascicolo_map.value, fascicolo_details_map.value | |
| ) | |
| processed_files += 1 | |
| percentage = (processed_files / total_files) * 100 | |
| print(f"Copying files: {percentage:.2f}%", end="\r") | |
| if not ok(result): | |
| logger.error(f"{file}:\n{traceback_of(result)}") | |
| continue | |
| print("\nJob completed") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment