Skip to content

Instantly share code, notes, and snippets.

@tailot
Last active May 27, 2025 19:50
Show Gist options
  • Save tailot/ffeec19c350f3d124e104c2e9f930b6f to your computer and use it in GitHub Desktop.
Save tailot/ffeec19c350f3d124e104c2e9f930b6f to your computer and use it in GitHub Desktop.
This Colab notebook builds a multilingual spoken number recognition system, from dataset generation and CNN model training to TFJS conversion and real-time browser-based inference.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"id": "view-in-github",
"colab_type": "text"
},
"source": [
"<a href=\"https://colab.research.google.com/gist/tailot/ffeec19c350f3d124e104c2e9f930b6f/numbermodel.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "FKtTmA7Lf-3U"
},
"outputs": [],
"source": [
"!apt-get update && sudo apt-get install espeak-ng"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "lYHAcz3FOLxl"
},
"outputs": [],
"source": [
"import os\n",
"import subprocess\n",
"import shutil\n",
"\n",
"script_filename = \"generate_dataset.sh\"\n",
"\n",
"bash_script_content = \"\"\"#!/bin/bash\n",
"\n",
"NUM_START=1\n",
"NUM_END=20\n",
"BASE_OUTPUT_DIR=\"./speech_dataset_7lang_1_200_script_output\"\n",
"AUDIO_DIR_NAME=\"audio_files\"\n",
"\n",
"declare -A LANG_VOICES\n",
"LANG_VOICES[\"english\"]=\"en\"\n",
"LANG_VOICES[\"italian\"]=\"it\"\n",
"LANG_VOICES[\"spanish\"]=\"es\"\n",
"LANG_VOICES[\"french\"]=\"fr\"\n",
"LANG_VOICES[\"german\"]=\"de\"\n",
"LANG_VOICES[\"arabic\"]=\"ar\"\n",
"LANG_VOICES[\"japanese\"]=\"ja\"\n",
"\n",
"VOICE_VARIANTS=(\n",
" \"\" \"+f1\" \"+f2\" \"+f3\" \"+f4\"\n",
" \"+m1\" \"+m2\" \"+m3\" \"+m4\" \"+m5\" \"+m6\"\n",
" \"+klatt\" \"+klatt2\" \"+klatt3\"\n",
")\n",
"\n",
"PITCH_VALUES=(30 50 70 85)\n",
"SPEED_VALUES=(120 160 200)\n",
"WORD_GAP_VALUES=(0 5)\n",
"\n",
"echo \"Creating numerical speech dataset (Bash Script)...\"\n",
"mkdir -p \"${BASE_OUTPUT_DIR}/${AUDIO_DIR_NAME}\"\n",
"\n",
"num_numbers=$((NUM_END - NUM_START + 1))\n",
"num_languages=${#LANG_VOICES[@]}\n",
"num_voice_variants=${#VOICE_VARIANTS[@]}\n",
"num_pitch_values=${#PITCH_VALUES[@]}\n",
"num_speed_values=${#SPEED_VALUES[@]}\n",
"num_word_gap_values=${#WORD_GAP_VALUES[@]}\n",
"\n",
"expected_total_files=$(( num_numbers * num_languages * num_voice_variants * num_pitch_values * num_speed_values * num_word_gap_values ))\n",
"estimated_size_kb=$(( expected_total_files * 20 )) # Estimate with 20KB/file for numbers\n",
"estimated_size_mb=$(( estimated_size_kb / 1024 ))\n",
"# Using bc for floating point GB calculation for better precision if NUM_END is very high\n",
"estimated_size_gb=$(echo \"scale=2; $estimated_size_kb / (1024*1024)\" | bc)\n",
"\n",
"\n",
"echo \"CONFIGURED PARAMETERS (Bash Script):\"\n",
"echo \" Numbers: $num_numbers (from $NUM_START to $NUM_END)\"\n",
"echo \" Languages: $num_languages (${!LANG_VOICES[@]})\"\n",
"echo \" Voice Variants: $num_voice_variants\"\n",
"echo \" Pitch Values: $num_pitch_values\"\n",
"echo \" Speed Values: $num_speed_values\"\n",
"echo \" Word Gap Values: $num_word_gap_values\"\n",
"echo \"-----------------------------------------------------\"\n",
"echo \"Expected about $expected_total_files files to be generated.\"\n",
"echo \"Estimated size (based on ~20KB/file): ${estimated_size_mb} MB (~${estimated_size_gb} GB)\"\n",
"echo \"WARNING: Generation will take time if NUM_END is high!\"\n",
"echo \"-----------------------------------------------------\"\n",
"\n",
"read -p \"Proceed with generation via Bash script? (y/N): \" confirm\n",
"if [[ \"$confirm\" != [yY] && \"$confirm\" != [yY][eE][sS] ]]; then\n",
" echo \"Generation (Bash Script) cancelled.\"\n",
" exit 0\n",
"fi\n",
"\n",
"total_files_generated=0\n",
"file_counter=0\n",
"\n",
"for lang_name in \"${!LANG_VOICES[@]}\"; do\n",
" base_voice_id=\"${LANG_VOICES[$lang_name]}\"\n",
" lang_output_dir=\"${BASE_OUTPUT_DIR}/${AUDIO_DIR_NAME}/${lang_name}\"\n",
" mkdir -p \"$lang_output_dir\"\n",
" echo \"\"\n",
" echo \"------------------------------------\"\n",
" echo \"Generating (Bash Script) for Language: $lang_name (Base espeak voice: $base_voice_id)\"\n",
" echo \"------------------------------------\"\n",
"\n",
" for number in $(seq \"$NUM_START\" \"$NUM_END\"); do\n",
" text_to_speak=\"$number\"\n",
" for variant_suffix in \"${VOICE_VARIANTS[@]}\"; do\n",
" current_voice_for_espeak=\"${base_voice_id}${variant_suffix}\"\n",
" variant_name_for_file=$(echo \"$variant_suffix\" | tr -d '+-' | tr -cd '[:alnum:]')\n",
" if [ -z \"$variant_name_for_file\" ]; then variant_name_for_file=\"base\"; fi\n",
"\n",
" for current_pitch in \"${PITCH_VALUES[@]}\"; do\n",
" for current_speed in \"${SPEED_VALUES[@]}\"; do\n",
" for current_gap in \"${WORD_GAP_VALUES[@]}\"; do\n",
" ((file_counter++))\n",
" output_filename=\"${lang_output_dir}/${lang_name}_num_${number}_var_${variant_name_for_file}_p${current_pitch}_s${current_speed}_g${current_gap}.wav\"\n",
"\n",
" if (( file_counter % 200 == 0 )); then\n",
" echo \" ($file_counter/$expected_total_files) Generating (Bash Script): ${lang_name}_num_${number}_var_${variant_name_for_file}...\"\n",
" fi\n",
"\n",
" espeak-ng -v \"$current_voice_for_espeak\" -p \"$current_pitch\" -s \"$current_speed\" -g \"$current_gap\" -w \"$output_filename\" \"$text_to_speak\" > /dev/null 2>&1\n",
" if [ $? -ne 0 ]; then\n",
" echo \" ERROR ($?): espeak-ng (Bash Script) failed for $output_filename (Voice: $current_voice_for_espeak)\"\n",
" else\n",
" ((total_files_generated++))\n",
" fi\n",
" done\n",
" done\n",
" done\n",
" done\n",
" done\n",
"done\n",
"\n",
"echo \"\"\n",
"echo \"--- Dataset generation (Bash Script) completed! ---\"\n",
"echo \"Audio files actually generated: $total_files_generated / $expected_total_files (expected)\"\n",
"echo \"Audio files saved in: ${BASE_OUTPUT_DIR}/${AUDIO_DIR_NAME}\"\n",
"\n",
"if command -v du &> /dev/null && command -v bc &> /dev/null; then\n",
" echo \"Calculating actual size (Bash Script)...\"\n",
" actual_size_kb=$(du -sk \"${BASE_OUTPUT_DIR}/${AUDIO_DIR_NAME}\" | cut -f1)\n",
" if [[ -n \"$actual_size_kb\" && \"$actual_size_kb\" -gt 0 ]]; then\n",
" actual_size_mb=$((actual_size_kb / 1024))\n",
" actual_size_gb=$(echo \"scale=2; $actual_size_kb / (1024*1024)\" | bc)\n",
" echo \"Actual size: ${actual_size_kb} KB (${actual_size_mb} MB / ${actual_size_gb} GB)\"\n",
" elif [[ -n \"$actual_size_kb\" ]]; then\n",
" echo \"Actual size: ${actual_size_kb} KB (0 MB / 0.00 GB)\"\n",
" else\n",
" echo \"Could not calculate actual size with 'du'.\";\n",
" fi\n",
"fi\n",
"\"\"\"\n",
"\n",
"def main():\n",
" try:\n",
" with open(script_filename, \"w\") as f:\n",
" f.write(bash_script_content)\n",
" print(f\"Bash script saved as '{script_filename}'\")\n",
"\n",
" try:\n",
" os.chmod(script_filename, 0o755)\n",
" print(f\"Execution permissions set for '{script_filename}' (for Unix-like environments).\")\n",
" except Exception as e:\n",
" print(f\"Warning: Could not set execution permissions (could be Windows or missing permissions): {e}\")\n",
"\n",
" except IOError as e:\n",
" print(f\"Error writing file '{script_filename}': {e}\")\n",
" return\n",
"\n",
" if shutil.which(\"espeak-ng\") is None:\n",
" print(\"Command 'espeak-ng' not found. Attempting installation (requires sudo/apt)...\")\n",
" try:\n",
" print(\"Updating package list (sudo apt-get update)...\")\n",
" update_process = subprocess.run([\"sudo\", \"apt-get\", \"update\"], check=True, capture_output=True, text=True)\n",
" # print(\"apt-get update output:\", update_process.stdout) # Usually too verbose\n",
"\n",
" print(\"Installing espeak-ng (sudo apt-get install -y espeak-ng)...\")\n",
" install_process = subprocess.run([\"sudo\", \"apt-get\", \"install\", \"-y\", \"espeak-ng\"], check=True, capture_output=True, text=True)\n",
" # print(\"espeak-ng installation output:\", install_process.stdout) # Usually too verbose\n",
" print(\"'espeak-ng' should now be installed.\")\n",
" except subprocess.CalledProcessError as e:\n",
" print(f\"Error during 'espeak-ng' installation steps: {e}\")\n",
" if e.stdout:\n",
" print(\"Stdout from failed command:\", e.stdout)\n",
" if e.stderr:\n",
" print(\"Stderr from failed command:\", e.stderr)\n",
" print(\"You may need to install 'espeak-ng' manually.\")\n",
" return # Potentially exit if espeak-ng is critical and installation failed\n",
" except FileNotFoundError:\n",
" print(\"Command 'sudo' or 'apt-get' not found. Cannot install 'espeak-ng' automatically.\")\n",
" print(\"Ensure 'espeak-ng' is installed and in PATH.\")\n",
" return # Potentially exit\n",
"\n",
" print(f\"\\nExecuting Bash script '{script_filename}'...\")\n",
" try:\n",
" process = subprocess.run(\n",
" [f\"./{script_filename}\"],\n",
" check=True,\n",
" text=True,\n",
" input=\"y\\n\" # Provide 'y' followed by a newline to the Bash script's stdin\n",
" )\n",
" # The Bash script's output (progress, final messages) will be printed\n",
" # directly to the terminal as capture_output=False (default).\n",
" print(f\"\\nBash script '{script_filename}' execution reported success by Python.\")\n",
"\n",
" except FileNotFoundError:\n",
" print(f\"Error: Script '{script_filename}' not found. Ensure it was created correctly and is in the correct path.\")\n",
" except subprocess.CalledProcessError as e:\n",
" print(f\"Error during Bash script '{script_filename}' execution:\")\n",
" print(f\" Return code: {e.returncode}\")\n",
" # If using input=\"y\\n\", stdout and stderr are not captured in e.stdout/e.stderr\n",
" # unless capture_output=True is also added.\n",
" # The Bash script's own output (if it printed errors before exiting non-zero)\n",
" # should have appeared directly on the terminal.\n",
" print(f\" (Bash script output, if any, should have appeared above this message)\")\n",
" except Exception as e:\n",
" print(f\"An unexpected error occurred during script execution: {e}\")\n",
"\n",
"if __name__ == '__main__':\n",
" main()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "pQZRgRDlt2Xj"
},
"outputs": [],
"source": [
"pip install gTTS librosa tensorflow scikit-learn numpy tensorflowjs"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "2dj8MgYgOcmo"
},
"outputs": [],
"source": [
"import os\n",
"import json\n",
"import numpy as np\n",
"# from gtts import gTTS # Kept but not used if data_source_mode = \"scan_external\"\n",
"# import pyttsx3 # Kept but not used if data_source_mode = \"scan_external\"\n",
"import librosa\n",
"import tensorflow as tf\n",
"from tensorflow.keras.models import Sequential\n",
"from tensorflow.keras.layers import Conv1D, MaxPooling1D, Dropout, Flatten, Dense\n",
"from tensorflow.keras.utils import to_categorical\n",
"from sklearn.model_selection import train_test_split\n",
"from sklearn.preprocessing import StandardScaler, LabelEncoder\n",
"import shutil\n",
"import re # For parsing filenames\n",
"import traceback\n",
"import math # For math.ceil\n",
"import gc # Garbage Collector\n",
"\n",
"### CONFIGURATIONS ###\n",
"CONFIG = {\n",
" \"data_source_mode\": \"scan_external\",\n",
" \"scan_audio_root_dir\": \"./speech_dataset_7lang_1_200_script_output/audio_files\",\n",
" \"languages\": {\n",
" \"english\": \"en\", \"italian\": \"it\", \"spanish\": \"es\", \"french\": \"fr\",\n",
" \"german\": \"de\", \"arabic\": \"ar\", \"chinese_simplified\": \"zh-CN\", \"japanese\": \"ja\"\n",
" },\n",
" \"num_start\": 1, \"num_end\": 200,\n",
" \"use_pyttsx3\": True, \"pyttsx3_rate_adjustment\": -40, \"pyttsx3_voice_variants\": [\"\", \"+f1\", \"+m1\"],\n",
" \"python_script_output_base_dir\": \"./python_processing_output_scan_mode_opt\",\n",
" \"audio_dir_name_internal_gen\": \"audio_files_internal\",\n",
" \"model_dir_name\": \"model\",\n",
" \"sr\": 16000, \"n_mfcc\": 40, \"max_pad_len_mfcc\": 120,\n",
" \"test_size\": 0.2, \"random_state\": 42,\n",
" \"epochs\": 5000, # Reduced for quick tests with generator\n",
" \"batch_size\": 16, # Kept, the generator will use it\n",
"}\n",
"\n",
"MODEL_DIR = os.path.join(CONFIG[\"python_script_output_base_dir\"], CONFIG[\"model_dir_name\"])\n",
"INTERNAL_AUDIO_GEN_DIR = os.path.join(CONFIG[\"python_script_output_base_dir\"], CONFIG[\"audio_dir_name_internal_gen\"])\n",
"\n",
"os.makedirs(MODEL_DIR, exist_ok=True)\n",
"if CONFIG.get(\"data_source_mode\") != \"scan_external\":\n",
" from gtts import gTTS # Conditional import\n",
" import pyttsx3 # Conditional import\n",
" os.makedirs(INTERNAL_AUDIO_GEN_DIR, exist_ok=True)\n",
"\n",
"\n",
"def sanitize_variant_name_py(variant_suffix):\n",
" if not variant_suffix:\n",
" return \"base\"\n",
" name = re.sub(r'[^a-zA-Z0-9]', '', variant_suffix)\n",
" return name if name else \"unknown\"\n",
"\n",
"def _scan_external_audio_files():\n",
" print(\"--- External Scan Mode Active ---\")\n",
" audio_items_to_process = []\n",
" all_label_keys = []\n",
"\n",
" scan_root = CONFIG.get(\"scan_audio_root_dir\")\n",
" if not scan_root or not os.path.isdir(scan_root):\n",
" print(f\"ERROR: Scan directory '{scan_root}' is invalid.\")\n",
" return [], None\n",
" print(f\"Scanning audio in: {scan_root}\")\n",
" filename_pattern = re.compile(r'([a-zA-Z0-9]+)_num_(\\d+)_var_([a-zA-Z0-9]+)_p(\\d+)_s(\\d+)_g(\\d+)\\.wav')\n",
" found_files_count = 0\n",
" for lang_dir_name in os.listdir(scan_root):\n",
" lang_dir_path = os.path.join(scan_root, lang_dir_name)\n",
" if os.path.isdir(lang_dir_path):\n",
" current_lang_name_from_folder = lang_dir_name\n",
" for filename in os.listdir(lang_dir_path):\n",
" if filename.endswith(\".wav\"):\n",
" file_path = os.path.join(lang_dir_path, filename)\n",
" match = filename_pattern.match(filename)\n",
" if match:\n",
" lang_from_filename, number_str, variant_str, pitch_str, speed_str, gap_str = match.groups()\n",
" if lang_from_filename.lower() != current_lang_name_from_folder.lower():\n",
" print(f\" WARN: Language mismatch for {filename}. Folder: '{current_lang_name_from_folder}', Filename: '{lang_from_filename}'. Using language from folder.\")\n",
" label_key = f\"{current_lang_name_from_folder}_{number_str}\"\n",
" all_label_keys.append(label_key)\n",
" audio_items_to_process.append({\n",
" \"path\": file_path, \"language_folder\": current_lang_name_from_folder,\n",
" \"number\": int(number_str), \"label_text\": label_key,\n",
" })\n",
" found_files_count += 1\n",
" # else: print(f\" WARN: Filename does not match regex (from Bash script naming convention), skipped: {os.path.join(current_lang_name_from_folder,filename)}\")\n",
" if not all_label_keys:\n",
" print(\"No labels derived. Check directory, filenames, regex pattern.\")\n",
" return [], None\n",
" print(f\"Found {found_files_count} matching .wav files.\")\n",
" le = LabelEncoder()\n",
" unique_sorted_labels = sorted(list(set(all_label_keys)))\n",
" le.fit(unique_sorted_labels)\n",
" print(f\"LabelEncoder trained on {len(le.classes_)} classes.\")\n",
" final_audio_paths_labels_info = []\n",
" for item in audio_items_to_process:\n",
" try:\n",
" item[\"class_id\"] = int(le.transform([item[\"label_text\"]])[0])\n",
" final_audio_paths_labels_info.append(item)\n",
" except ValueError:\n",
" print(f\"ERROR: Label '{item['label_text']}' not in LabelEncoder. Skipping: {item['path']}\")\n",
" label_encoder_path = os.path.join(MODEL_DIR, \"label_encoder_classes.json\")\n",
" with open(label_encoder_path, 'w') as f:\n",
" json.dump(list(le.classes_), f, indent=4)\n",
" print(f\"Label map saved: {label_encoder_path}\")\n",
" if final_audio_paths_labels_info:\n",
" print(f\"Audio samples (from scan) to process: {len(final_audio_paths_labels_info)}\")\n",
" return final_audio_paths_labels_info, le\n",
"\n",
"\n",
"def _generate_internal_audio_files():\n",
" print(\"--- Internal Generation Mode Active ---\")\n",
" if CONFIG.get(\"data_source_mode\") == \"scan_external\": return [], None # Should not be called in this mode\n",
" audio_paths_labels_info = []\n",
" all_label_keys = []\n",
" for lang_name in CONFIG[\"languages\"].keys():\n",
" for number in range(CONFIG[\"num_start\"], CONFIG[\"num_end\"] + 1):\n",
" all_label_keys.append(f\"{lang_name}_{number}\")\n",
" if not all_label_keys: return [], None\n",
" le = LabelEncoder()\n",
" le.fit(sorted(list(set(all_label_keys))))\n",
" print(f\"LabelEncoder trained on {len(le.classes_)} classes (internal generation).\")\n",
"\n",
" pyttsx3_engine = None\n",
" if CONFIG.get(\"use_pyttsx3\", False):\n",
" try:\n",
" import pyttsx3 # Local import for this function\n",
" pyttsx3_engine = pyttsx3.init()\n",
" current_rate = pyttsx3_engine.getProperty('rate')\n",
" pyttsx3_engine.setProperty('rate', current_rate + CONFIG.get(\"pyttsx3_rate_adjustment\", 0))\n",
" except Exception as e:\n",
" print(f\"ERROR initializing pyttsx3: {e}\")\n",
" pyttsx3_engine = None\n",
" from gtts import gTTS # Local import\n",
"\n",
" for lang_name, lang_code_gtts in CONFIG[\"languages\"].items():\n",
" lang_audio_dir = os.path.join(INTERNAL_AUDIO_GEN_DIR, lang_name)\n",
" os.makedirs(lang_audio_dir, exist_ok=True)\n",
" # ... (pyttsx3 voice selection logic omitted for brevity, you can reintegrate it if necessary) ...\n",
" for number in range(CONFIG[\"num_start\"], CONFIG[\"num_end\"] + 1):\n",
" text_to_speak = str(number)\n",
" label_key = f\"{lang_name}_{number}\"\n",
" class_id = int(le.transform([label_key])[0])\n",
" # gTTS\n",
" file_name_gtts = f\"{lang_name}_num_{number}_gtts.mp3\"\n",
" file_path_gtts = os.path.join(lang_audio_dir, file_name_gtts)\n",
" if not os.path.exists(file_path_gtts):\n",
" try:\n",
" tts_gtts = gTTS(text=text_to_speak, lang=lang_code_gtts, slow=False)\n",
" tts_gtts.save(file_path_gtts)\n",
" audio_paths_labels_info.append({\"path\": file_path_gtts, \"label_text\": label_key, \"class_id\": class_id, \"engine\": \"gtts\"})\n",
" except Exception as e: print(f\"ERROR gTTS for {file_path_gtts}: {e}\")\n",
" else: audio_paths_labels_info.append({\"path\": file_path_gtts, \"label_text\": label_key, \"class_id\": class_id, \"engine\": \"gtts\"})\n",
" # pyttsx3\n",
" if pyttsx3_engine:\n",
" variants_to_try = CONFIG.get(\"pyttsx3_voice_variants\", [\"\"])\n",
" for variant_suffix in variants_to_try:\n",
" variant_name_for_file = sanitize_variant_name_py(variant_suffix)\n",
" # ... (pyttsx3 voice selection and variant handling logic omitted for brevity) ...\n",
" pyttsx3_filename = f\"{lang_name}_num_{number}_pyttsx3_var_{variant_name_for_file}.wav\"\n",
" pyttsx3_filepath = os.path.join(lang_audio_dir, pyttsx3_filename)\n",
" if not os.path.exists(pyttsx3_filepath):\n",
" try:\n",
" # pyttsx3_engine.setProperty('voice', ...) # If needed\n",
" pyttsx3_engine.save_to_file(text_to_speak, pyttsx3_filepath)\n",
" pyttsx3_engine.runAndWait()\n",
" audio_paths_labels_info.append({\"path\": pyttsx3_filepath, \"label_text\": label_key, \"class_id\": class_id, \"engine\": f\"pyttsx3_var_{variant_name_for_file}\"})\n",
" except Exception as e: print(f\"ERROR pyttsx3 for {pyttsx3_filepath}: {e}\")\n",
" else: audio_paths_labels_info.append({\"path\": pyttsx3_filepath, \"label_text\": label_key, \"class_id\": class_id, \"engine\": f\"pyttsx3_var_{variant_name_for_file}\"})\n",
" label_encoder_path = os.path.join(MODEL_DIR, \"label_encoder_classes.json\")\n",
" with open(label_encoder_path, 'w') as f: json.dump(list(le.classes_), f, indent=4)\n",
" print(f\"Label map (internal generation) saved: {label_encoder_path}\")\n",
" if audio_paths_labels_info: print(f\"Audio samples (internal generation) referenced: {len(audio_paths_labels_info)}\")\n",
" return audio_paths_labels_info, le\n",
"\n",
"\n",
"def generate_audio_data_controller():\n",
" mode = CONFIG.get(\"data_source_mode\", \"generate_internal\")\n",
" if mode == \"scan_external\":\n",
" print(\"--- Phase 1: External Audio File Scan Selected ---\")\n",
" return _scan_external_audio_files()\n",
" else:\n",
" print(\"--- Phase 1: Internal Audio File Generation Selected ---\")\n",
" return _generate_internal_audio_files()\n",
"\n",
"\n",
"def fit_scaler(file_paths_for_scaler):\n",
" \"\"\"\n",
" Extracts MFCCs from a list of files and fits the StandardScaler.\n",
" Uses partial_fit to handle large datasets without loading everything into RAM.\n",
" \"\"\"\n",
" print(\"\\n--- Phase 2a: Fitting MFCC Scaler ---\")\n",
" scaler = StandardScaler()\n",
" mfcc_concatenated_for_scaler = [] # Temporary list for chunks\n",
" CHUNK_SIZE_FOR_SCALER_FIT = 512 # Number of files to process before partial_fit\n",
"\n",
" processed_files = 0\n",
" for i, item in enumerate(file_paths_for_scaler):\n",
" file_path = item[\"path\"]\n",
" try:\n",
" y, sr_loaded = librosa.load(file_path, sr=CONFIG[\"sr\"], mono=True)\n",
" mfcc = librosa.feature.mfcc(y=y, sr=sr_loaded, n_mfcc=CONFIG[\"n_mfcc\"])\n",
" # No need to transpose and pad here, just for fitting the scaler.\n",
" # The scaler expects (n_samples, n_features) where n_features is n_mfcc,\n",
" # and n_samples is the total number of frames from all files.\n",
" mfcc_concatenated_for_scaler.append(mfcc.T) # Add (time_steps, n_mfcc)\n",
" processed_files += 1\n",
"\n",
" if len(mfcc_concatenated_for_scaler) >= CHUNK_SIZE_FOR_SCALER_FIT or i == len(file_paths_for_scaler) - 1:\n",
" if mfcc_concatenated_for_scaler: # Ensure there's something to process\n",
" current_chunk_to_fit = np.vstack(mfcc_concatenated_for_scaler) # (total_frames_in_chunk, n_mfcc)\n",
" scaler.partial_fit(current_chunk_to_fit)\n",
" print(f\" Scaler partial_fit on {len(mfcc_concatenated_for_scaler)} files ({current_chunk_to_fit.shape[0]} frames), {processed_files}/{len(file_paths_for_scaler)} processed.\")\n",
" mfcc_concatenated_for_scaler = [] # Reset for the next chunk\n",
" gc.collect() # Free memory from the processed chunk\n",
"\n",
" except Exception as e:\n",
" print(f\"ERROR during MFCC extraction for scaler fitting from {file_path}: {e}\")\n",
"\n",
" if processed_files == 0:\n",
" print(\"WARNING: No files processed to fit the scaler. The scaler will not be fitted.\")\n",
" return None # Return None if the scaler cannot be fitted\n",
"\n",
" # Save scaler parameters\n",
" scaler_path = os.path.join(MODEL_DIR, \"mfcc_scaler.json\")\n",
" try:\n",
" scaler_params = {'mean_': scaler.mean_.tolist(), 'scale_': scaler.scale_.tolist()}\n",
" with open(scaler_path, 'w') as f:\n",
" json.dump(scaler_params, f, indent=4)\n",
" print(f\"MFCC scaler parameters saved to: {scaler_path}\")\n",
" except AttributeError: # If scaler was not fitted (e.g., mean_ or scale_ does not exist)\n",
" print(\"ERROR: Scaler does not seem to be fitted correctly (mean_ or scale_ missing).\")\n",
" return None\n",
" return scaler\n",
"\n",
"\n",
"class DataGenerator(tf.keras.utils.Sequence):\n",
" \"\"\"\n",
" Data generator for Keras. Loads and processes audio in batches.\n",
" \"\"\"\n",
" def __init__(self, file_infos, scaler, num_classes, batch_size, config, shuffle=True):\n",
" self.file_infos = file_infos # List of dictionaries {\"path\": ..., \"class_id\": ...}\n",
" self.scaler = scaler\n",
" self.num_classes = num_classes\n",
" self.batch_size = batch_size\n",
" self.config = config\n",
" self.shuffle = shuffle\n",
" self.sr = config[\"sr\"]\n",
" self.n_mfcc = config[\"n_mfcc\"]\n",
" self.max_pad_len_mfcc = config[\"max_pad_len_mfcc\"]\n",
" self.indexes = np.arange(len(self.file_infos))\n",
" if self.shuffle:\n",
" np.random.shuffle(self.indexes)\n",
"\n",
" def __len__(self):\n",
" # Number of batches per epoch\n",
" return math.ceil(len(self.file_infos) / self.batch_size)\n",
"\n",
" def __getitem__(self, index):\n",
" # Generate one batch of data\n",
" batch_indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]\n",
" batch_file_infos = [self.file_infos[k] for k in batch_indexes]\n",
"\n",
" X_batch = []\n",
" y_batch = []\n",
"\n",
" for item in batch_file_infos:\n",
" file_path = item[\"path\"]\n",
" class_id = item[\"class_id\"]\n",
" try:\n",
" y, sr_loaded = librosa.load(file_path, sr=self.sr, mono=True)\n",
" mfcc = librosa.feature.mfcc(y=y, sr=sr_loaded, n_mfcc=self.n_mfcc)\n",
" mfcc = mfcc.T # (time_steps, n_mfcc)\n",
"\n",
" if mfcc.shape[0] > self.max_pad_len_mfcc:\n",
" mfcc_padded = mfcc[:self.max_pad_len_mfcc, :]\n",
" else:\n",
" pad_width = self.max_pad_len_mfcc - mfcc.shape[0]\n",
" mfcc_padded = np.pad(mfcc, ((0, pad_width), (0, 0)), mode='constant')\n",
"\n",
" # Normalization with the fitted scaler\n",
" # The scaler expects (n_samples, n_features)\n",
" # Here mfcc_padded is (max_pad_len_mfcc, n_mfcc)\n",
" # We need to scale it row by row if the scaler was fitted on all frames\n",
" # Or, if the scaler was fitted on already padded/truncated MFCCs, then it's fine.\n",
" # The fit_scaler implementation fits on unpadded frames, so we scale the frames.\n",
" if self.scaler and hasattr(self.scaler, 'mean_') and hasattr(self.scaler, 'scale_'):\n",
" mfcc_scaled = self.scaler.transform(mfcc_padded) # Apply transformation\n",
" else: # Fallback if the scaler is not ready\n",
" print(f\"WARN: Scaler not available or not fitted for {file_path}, using unscaled MFCCs.\")\n",
" mfcc_scaled = mfcc_padded\n",
"\n",
"\n",
" X_batch.append(mfcc_scaled)\n",
" y_batch.append(class_id)\n",
" except Exception as e:\n",
" print(f\"ERROR DataGenerator processing {file_path}: {e}\\n{traceback.format_exc()}\")\n",
" # You might want to skip this file or add a placeholder\n",
" # For now, we skip it if it fails. This might reduce the batch size.\n",
" continue\n",
"\n",
" if not X_batch: # If all files in the batch failed\n",
" # Return an empty batch or a placeholder, depending on how Keras handles it\n",
" # It's better to ensure data is clean beforehand\n",
" print(f\"WARNING: Batch {index} is empty due to file processing errors.\")\n",
" # Return empty arrays with correct shapes if possible\n",
" # Or raise an error if this should not happen\n",
" empty_x = np.empty((0, self.max_pad_len_mfcc, self.n_mfcc), dtype=np.float32)\n",
" empty_y = np.empty((0, self.num_classes), dtype=np.float32)\n",
" return empty_x, empty_y\n",
"\n",
"\n",
" X_batch_np = np.array(X_batch, dtype=np.float32)\n",
" y_batch_one_hot = to_categorical(np.array(y_batch), num_classes=self.num_classes)\n",
"\n",
" return X_batch_np, y_batch_one_hot\n",
"\n",
" def on_epoch_end(self):\n",
" # Called at the end of each epoch\n",
" if self.shuffle:\n",
" np.random.shuffle(self.indexes)\n",
"\n",
"\n",
"def build_model(input_shape, num_classes):\n",
" model = Sequential([\n",
" Conv1D(filters=32, kernel_size=5, activation='relu', padding='same', input_shape=input_shape),\n",
" MaxPooling1D(pool_size=2, strides=2), Dropout(0.25),\n",
" Conv1D(filters=64, kernel_size=5, activation='relu', padding='same'),\n",
" MaxPooling1D(pool_size=2, strides=2), Dropout(0.25),\n",
" Flatten(),\n",
" Dense(128, activation='relu'), Dropout(0.5),\n",
" Dense(num_classes, activation='softmax')\n",
" ])\n",
" model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),\n",
" loss='categorical_crossentropy', metrics=['accuracy'])\n",
" return model\n",
"\n",
"def main():\n",
" print(\"Starting script (RAM optimized)...\")\n",
"\n",
" # Phase 1: Generation or Scanning of audio file metadata\n",
" audio_paths_labels_info, label_encoder = generate_audio_data_controller()\n",
"\n",
" if not audio_paths_labels_info or label_encoder is None:\n",
" print(\"No audio data or LabelEncoder. Aborting.\")\n",
" return\n",
"\n",
" num_classes = len(label_encoder.classes_)\n",
" if num_classes == 0:\n",
" print(\"ERROR: No classes found. Aborting.\")\n",
" return\n",
" print(f\"\\nTotal number of classes for the model: {num_classes}\")\n",
"\n",
" # Report class distribution from metadata (before split)\n",
" # This is useful to check for imbalance before splitting\n",
" labels_from_info = [item['class_id'] for item in audio_paths_labels_info]\n",
" unique_labels_info, counts_info = np.unique(labels_from_info, return_counts=True)\n",
" print(\"Sample distribution per class ID (from metadata):\")\n",
" for lbl_id, count in zip(unique_labels_info, counts_info):\n",
" class_name = label_encoder.inverse_transform([lbl_id])[0]\n",
" print(f\" Class '{class_name}' (ID: {lbl_id}): {count} samples\")\n",
"\n",
" min_samples_per_class_info = np.min(counts_info) if counts_info.size > 0 else 0\n",
"\n",
"\n",
" print(\"\\n--- Phase 3: Data Preparation (Split and Scaler) ---\")\n",
" train_files_info, test_files_info = [], []\n",
"\n",
" # Improved split logic to handle edge cases\n",
" if len(audio_paths_labels_info) < 2 :\n",
" print(\"WARNING: Fewer than 2 total samples. Using all data for training.\")\n",
" train_files_info = audio_paths_labels_info\n",
" elif CONFIG[\"test_size\"] == 0 or CONFIG[\"test_size\"] * len(audio_paths_labels_info) < 1:\n",
" print(\"No test set required. All data for training.\")\n",
" train_files_info = audio_paths_labels_info\n",
" elif min_samples_per_class_info < 2 and CONFIG[\"test_size\"] > 0 : # Not enough samples in at least one class to stratify\n",
" print(f\"WARNING: Least populated class has {min_samples_per_class_info} sample(s). Splitting without stratification.\")\n",
" try:\n",
" train_files_info, test_files_info = train_test_split(\n",
" audio_paths_labels_info,\n",
" test_size=CONFIG[\"test_size\"],\n",
" random_state=CONFIG[\"random_state\"],\n",
" stratify=None # Cannot stratify here if classes have only 1 sample\n",
" )\n",
" except ValueError as e:\n",
" print(f\"ERROR train_test_split (no stratify): {e}. Using all data for training.\")\n",
" train_files_info = audio_paths_labels_info\n",
" else: # Standard case with stratification (if test_size > 0)\n",
" try:\n",
" train_files_info, test_files_info = train_test_split(\n",
" audio_paths_labels_info,\n",
" test_size=CONFIG[\"test_size\"],\n",
" random_state=CONFIG[\"random_state\"],\n",
" stratify=[item['class_id'] for item in audio_paths_labels_info] # Stratify on class_ids\n",
" )\n",
" except ValueError as e:\n",
" print(f\"ERROR train_test_split (with stratify): {e}. Attempting without stratification.\")\n",
" train_files_info, test_files_info = train_test_split(\n",
" audio_paths_labels_info,\n",
" test_size=CONFIG[\"test_size\"],\n",
" random_state=CONFIG[\"random_state\"],\n",
" stratify=None\n",
" )\n",
"\n",
" if not train_files_info:\n",
" print(\"No training data available after split. Aborting.\")\n",
" return\n",
"\n",
" print(f\"Number of training samples: {len(train_files_info)}\")\n",
" print(f\"Number of test samples: {len(test_files_info)}\")\n",
"\n",
" # Phase 2a (modified): Fit the scaler ONLY on training data\n",
" # Pass train_files_info to fit the scaler\n",
" scaler = fit_scaler(train_files_info)\n",
" if scaler is None:\n",
" print(\"ERROR: Scaler not initialized correctly. Training might fail or yield poor results.\")\n",
" # One might decide to abort or continue without scaling (not recommended)\n",
"\n",
" # Create generators\n",
" train_generator = DataGenerator(train_files_info, scaler, num_classes, CONFIG[\"batch_size\"], CONFIG)\n",
"\n",
" val_generator = None\n",
" perform_evaluation = len(test_files_info) > 0\n",
" if perform_evaluation:\n",
" val_generator = DataGenerator(test_files_info, scaler, num_classes, CONFIG[\"batch_size\"], CONFIG, shuffle=False) # No shuffle for val/test\n",
"\n",
" print(\"\\n--- Phase 4: Model Definition and Training ---\")\n",
" input_shape_model = (CONFIG[\"max_pad_len_mfcc\"], CONFIG[\"n_mfcc\"])\n",
" model = build_model(input_shape=input_shape_model, num_classes=num_classes)\n",
" model.summary()\n",
"\n",
" print(\"\\nStarting training with generators...\")\n",
"\n",
" # Callbacks\n",
" callbacks_list = [\n",
" tf.keras.callbacks.EarlyStopping(\n",
" monitor='val_loss' if perform_evaluation else 'loss',\n",
" patience=15, restore_best_weights=True, verbose=1\n",
" ),\n",
" tf.keras.callbacks.ReduceLROnPlateau(\n",
" monitor='val_loss' if perform_evaluation else 'loss',\n",
" factor=0.2, patience=7, min_lr=0.00001, verbose=1\n",
" )\n",
" ]\n",
"\n",
" history = model.fit(\n",
" train_generator,\n",
" epochs=CONFIG[\"epochs\"],\n",
" validation_data=val_generator if perform_evaluation else None,\n",
" callbacks=callbacks_list,\n",
" verbose=1\n",
" # steps_per_epoch and validation_steps are automatically inferred by Keras\n",
" # when using a tf.keras.utils.Sequence as a generator.\n",
" )\n",
"\n",
" if perform_evaluation and val_generator:\n",
" print(\"\\nEvaluating on Test Set with generator...\")\n",
" loss, accuracy = model.evaluate(val_generator, verbose=0)\n",
" print(f\"Final Test Set Evaluation - Loss: {loss:.4f}, Accuracy: {accuracy:.4f}\")\n",
" else:\n",
" print(\"\\nModel trained. No valid test set for final evaluation during fit.\")\n",
" # Evaluate on training set if there's no test set\n",
" print(\"Evaluating on Training Set with generator...\")\n",
" loss_train, accuracy_train = model.evaluate(train_generator, verbose=0) # Warning: this might take a long time if the training set is large\n",
" print(f\"Training Set Evaluation - Loss: {loss_train:.4f}, Accuracy: {accuracy_train:.4f}\")\n",
"\n",
"\n",
" print(\"\\n--- Phase 5: Saving Model ---\")\n",
" keras_model_filename = \"speech_recognition_model.keras\"\n",
" keras_model_path = os.path.join(MODEL_DIR, keras_model_filename)\n",
" model.save(keras_model_path)\n",
" print(f\"Model saved in .keras format to: {keras_model_path}\")\n",
"\n",
" saved_model_dir_name = \"tf_saved_model_export\"\n",
" saved_model_path = os.path.join(MODEL_DIR, saved_model_dir_name)\n",
" try:\n",
" if os.path.exists(saved_model_path): shutil.rmtree(saved_model_path)\n",
" tf.saved_model.save(model, saved_model_path)\n",
" print(f\"Model exported in TensorFlow SavedModel format: {os.path.abspath(saved_model_path)}\")\n",
" except Exception as e:\n",
" print(f\"ERROR exporting SavedModel: {e}\\n{traceback.format_exc()}\")\n",
"\n",
" config_path = os.path.join(MODEL_DIR, \"training_config.json\")\n",
" serializable_config = {k: (v.tolist() if isinstance(v, (np.ndarray, np.generic)) else v)\n",
" for k, v in CONFIG.items()\n",
" if isinstance(v, (str, int, float, bool, list, dict, type(None), np.ndarray, np.generic))}\n",
" with open(config_path, 'w') as f:\n",
" json.dump(serializable_config, f, indent=4)\n",
" print(f\"Training configuration saved to: {config_path}\")\n",
"\n",
" print(\"\\n--- Script completed! ---\")\n",
"\n",
"if __name__ == '__main__':\n",
" main()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "BdRzN6_gTE40"
},
"outputs": [],
"source": [
" !tensorflowjs_converter --input_format=tf_saved_model ./python_processing_output_scan_mode_opt/model/tf_saved_model_export ./speech_recognition_data_py/model/tfjs_model"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "xewacF38aw6b"
},
"outputs": [],
"source": [
"import os\n",
"import shutil\n",
"import tarfile\n",
"import logging\n",
"\n",
"# Logging configuration\n",
"logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')\n",
"\n",
"# --- PATH CONFIGURATION ---\n",
"# The directory that originally contains the model exported by the TensorFlow.js converter\n",
"original_tfjs_model_dir = \"speech_recognition_data_py\"\n",
"# The final name of the TFJS model directory (as it should appear in the archive and after renaming)\n",
"final_tfjs_model_dir = \"tfjs_model\"\n",
"\n",
"# Source directory for JSON configuration files\n",
"source_config_dir = os.path.join(\"python_processing_output_scan_mode_opt\", \"model\")\n",
"\n",
"# List of JSON files to include in the archive\n",
"json_files_to_package = [\n",
" \"mfcc_scaler.json\",\n",
" \"training_config.json\",\n",
" \"label_encoder_classes.json\"\n",
"]\n",
"\n",
"# Name of the output tar.gz archive\n",
"output_archive_name = \"web_model_package.tar.gz\"\n",
"# ------------------------------------\n",
"\n",
"def create_package():\n",
" \"\"\"\n",
" Renames the TFJS model folder, copies configuration files,\n",
" and creates a tar.gz archive.\n",
" \"\"\"\n",
" model_dir_to_archive = None\n",
"\n",
" # 1. Handle the TFJS model directory (rename or use existing)\n",
" if os.path.exists(original_tfjs_model_dir) and os.path.isdir(original_tfjs_model_dir):\n",
" logging.info(f\"Found directory '{original_tfjs_model_dir}'.\")\n",
" if os.path.exists(final_tfjs_model_dir):\n",
" logging.warning(f\"Destination directory '{final_tfjs_model_dir}' already exists. It will be removed.\")\n",
" if os.path.isdir(final_tfjs_model_dir):\n",
" shutil.rmtree(final_tfjs_model_dir)\n",
" else:\n",
" os.remove(final_tfjs_model_dir)\n",
"\n",
" logging.info(f\"Renaming '{original_tfjs_model_dir}' to '{final_tfjs_model_dir}'.\")\n",
" os.rename(original_tfjs_model_dir, final_tfjs_model_dir)\n",
" model_dir_to_archive = final_tfjs_model_dir\n",
" elif os.path.exists(final_tfjs_model_dir) and os.path.isdir(final_tfjs_model_dir):\n",
" logging.info(f\"Found directory '{final_tfjs_model_dir}' (likely already renamed). This will be used.\")\n",
" model_dir_to_archive = final_tfjs_model_dir\n",
" else:\n",
" logging.error(f\"Error: No TFJS model directory found ('{original_tfjs_model_dir}' or '{final_tfjs_model_dir}'). Cannot proceed.\")\n",
" return\n",
"\n",
" # 2. Verify the existence of source JSON files\n",
" source_json_paths = []\n",
" all_json_found = True\n",
" for json_file in json_files_to_package:\n",
" path = os.path.join(source_config_dir, json_file)\n",
" if os.path.isfile(path):\n",
" source_json_paths.append({\"full_path\": path, \"arcname\": json_file})\n",
" logging.info(f\"JSON file found: '{path}'. It will be added to the archive as '{json_file}'.\")\n",
" else:\n",
" logging.error(f\"Error: Source JSON file not found: '{path}'.\")\n",
" all_json_found = False\n",
"\n",
" if not all_json_found:\n",
" logging.error(\"Some JSON files were not found. Cannot create package.\")\n",
" return\n",
"\n",
" # 3. Create the tar.gz archive\n",
" logging.info(f\"Creating archive '{output_archive_name}'...\")\n",
" try:\n",
" with tarfile.open(output_archive_name, \"w:gz\") as tar:\n",
" # Add the TFJS model directory\n",
" logging.info(f\"Adding directory '{model_dir_to_archive}' to the archive.\")\n",
" tar.add(model_dir_to_archive, arcname=os.path.basename(model_dir_to_archive))\n",
"\n",
" # Add the JSON files\n",
" for item in source_json_paths:\n",
" logging.info(f\"Adding file '{item['full_path']}' to the archive as '{item['arcname']}'.\")\n",
" tar.add(item['full_path'], arcname=item['arcname'])\n",
"\n",
" logging.info(f\"Archive '{output_archive_name}' created successfully!\")\n",
" logging.info(\"The archive contains:\")\n",
" logging.info(f\" - Directory: {os.path.basename(model_dir_to_archive)}/\")\n",
" for item in source_json_paths:\n",
" logging.info(f\" - File: {item['arcname']}\")\n",
"\n",
" except Exception as e:\n",
" logging.error(f\"Error during archive creation: {e}\")\n",
" # Consider cleanup or name restoration in case of error here\n",
"\n",
"if __name__ == \"__main__\":\n",
" # Check if the script is run from the correct directory (optional, but useful).\n",
" # This check assumes 'speech_recognition_data_py' is a subdirectory\n",
" # or that paths are relative to the script's location.\n",
" # For this example, we rely on the relative paths defined above.\n",
"\n",
" create_package()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "g4m_Xp-OoMnm"
},
"outputs": [],
"source": [
"from IPython.display import HTML, display\n",
"import re # For comment removal\n",
"\n",
"html_content = \"\"\"\n",
"<!DOCTYPE html>\n",
"<html lang=\"en\">\n",
"<head>\n",
" <meta charset=\"UTF-8\">\n",
" <meta name=\"viewport\" content=\"width=device-width, initial-scale=1.0\">\n",
" <title>Speech Recognition with TensorFlow.js and Meyda.js</title>\n",
" <script src=\"https://cdn.jsdelivr.net/npm/@tensorflow/tfjs@latest/dist/tf.min.js\"></script>\n",
" <script src=\"https://cdn.jsdelivr.net/npm/[email protected]/dist/web/meyda.min.js\"></script>\n",
" <style>\n",
" body { font-family: Arial, sans-serif; margin: 20px; background-color: #f4f4f4; color: #333; }\n",
" .container { max-width: 600px; margin: auto; background: white; padding: 20px; border-radius: 8px; box-shadow: 0 0 10px rgba(0,0,0,0.1); }\n",
" h1 { text-align: center; color: #333; }\n",
" button {\n",
" display: block; width: 100%; padding: 12px; margin-top: 10px;\n",
" color: white; border: none;\n",
" border-radius: 5px; font-size: 16px; cursor: pointer; transition: background-color 0.3s;\n",
" }\n",
" #recordButton { background-color: #28a745; }\n",
" #recordButton:hover:not(:disabled) { background-color: #218838; }\n",
" #replayButton { background-color: #007bff; }\n",
" #replayButton:hover:not(:disabled) { background-color: #0056b3; }\n",
" button:disabled { background-color: #cccccc; cursor: not-allowed; }\n",
" #status, #predictionResult {\n",
" margin-top: 15px; padding: 10px; border-radius: 5px;\n",
" text-align: center; font-size: 1em;\n",
" }\n",
" #status { background-color: #e9ecef; color: #495057; }\n",
" #predictionResult { background-color: #d4edda; color: #155724; font-weight: bold; min-height: 20px;}\n",
" .info { font-size: 0.9em; text-align: center; margin-top: 10px; color: #555;}\n",
" #audioPlayback { width: 100%; margin-top: 15px; }\n",
" </style>\n",
"</head>\n",
"<body>\n",
" <div class=\"container\">\n",
" <h1>Real-time Speech Recognition</h1>\n",
" <p class=\"info\">\n",
" Click \"Record and Recognize\" to record a short audio from the microphone.\n",
" You should pronounce a single number.\n",
" </p>\n",
"\n",
" <button id=\"recordButton\" disabled>Loading Resources...</button>\n",
" <button id=\"replayButton\" disabled>Replay Recording</button>\n",
" <audio id=\"audioPlayback\" controls></audio>\n",
"\n",
" <div id=\"status\">Initializing...</div>\n",
" <div id=\"predictionResult\"></div>\n",
" </div>\n",
"\n",
" <script>\n",
" const MODEL_JSON_PATH = './tfjs_model/model/tfjs_model/model.json';\n",
" const CONFIG_PATH = './training_config.json';\n",
" const SCALER_PATH = './mfcc_scaler.json';\n",
" const LABELS_PATH = './label_encoder_classes.json';\n",
"\n",
" const MEYDA_BUFFER_SIZE = 512;\n",
" const MEYDA_NUMBER_OF_MEL_BANDS = 128;\n",
"\n",
" let model;\n",
" let trainingConfig;\n",
" let mfccScalerParams;\n",
" let labelClasses;\n",
"\n",
" let audioContext;\n",
" let meydaAnalyzer;\n",
" let microphoneSource;\n",
" let collectedMfccFrames = [];\n",
" let frameCount = 0;\n",
"\n",
" let mediaRecorder;\n",
" let audioChunks = [];\n",
" let audioBlobURL = null;\n",
"\n",
" const recordButton = document.getElementById('recordButton');\n",
" const replayButton = document.getElementById('replayButton');\n",
" const audioPlaybackElement = document.getElementById('audioPlayback');\n",
" const statusElem = document.getElementById('status');\n",
" const predictionResultElem = document.getElementById('predictionResult');\n",
"\n",
" async function loadResources() {\n",
" statusElem.textContent = 'Loading model and configurations...';\n",
" try {\n",
" model = await tf.loadGraphModel(MODEL_JSON_PATH);\n",
"\n",
" const [configRes, scalerRes, labelsRes] = await Promise.all([\n",
" fetch(CONFIG_PATH),\n",
" fetch(SCALER_PATH),\n",
" fetch(LABELS_PATH)\n",
" ]);\n",
"\n",
" if (!configRes.ok) throw new Error(`Error loading training_config.json: ${configRes.statusText} (${configRes.status})`);\n",
" if (!scalerRes.ok) throw new Error(`Error loading mfcc_scaler.json: ${scalerRes.statusText} (${scalerRes.status})`);\n",
" if (!labelsRes.ok) throw new Error(`Error loading label_encoder_classes.json: ${labelsRes.statusText} (${labelsRes.status})`);\n",
"\n",
" trainingConfig = await configRes.json();\n",
" mfccScalerParams = await scalerRes.json();\n",
" labelClasses = await labelsRes.json();\n",
"\n",
" statusElem.textContent = 'Resources loaded. Ready to record.';\n",
" recordButton.textContent = 'Record and Recognize';\n",
" recordButton.disabled = false;\n",
" console.log('Model and resources loaded:', { model, trainingConfig, mfccScalerParams, labelClasses });\n",
"\n",
" } catch (error) {\n",
" console.error('Error during resource loading:', error);\n",
" statusElem.textContent = `Loading error: ${error.message}. Check console and file paths.`;\n",
" predictionResultElem.textContent = 'Could not load necessary resources.';\n",
" recordButton.textContent = 'Loading Error';\n",
" recordButton.disabled = true;\n",
" }\n",
" }\n",
"\n",
" loadResources();\n",
"\n",
" function mfccCallback(features) {\n",
" if (features.mfcc && frameCount < trainingConfig.max_pad_len_mfcc) {\n",
" collectedMfccFrames.push(Array.from(features.mfcc));\n",
" frameCount++;\n",
" statusElem.textContent = `Recording... ${frameCount}/${trainingConfig.max_pad_len_mfcc} frames`;\n",
" if (frameCount >= trainingConfig.max_pad_len_mfcc) {\n",
" stopRecordingAndProcess();\n",
" }\n",
" }\n",
" }\n",
"\n",
"\n",
" async function startRecording() {\n",
" if (!model || !trainingConfig || !mfccScalerParams || !labelClasses) {\n",
" statusElem.textContent = \"Resources not fully loaded. Please wait or check for errors.\";\n",
" console.error(\"Attempting to record before resources are fully loaded.\");\n",
" return;\n",
" }\n",
" recordButton.disabled = true;\n",
" replayButton.disabled = true;\n",
" audioPlaybackElement.src = '';\n",
" if (audioBlobURL) {\n",
" URL.revokeObjectURL(audioBlobURL);\n",
" audioBlobURL = null;\n",
" }\n",
" predictionResultElem.textContent = '';\n",
" collectedMfccFrames = [];\n",
" frameCount = 0;\n",
" audioChunks = [];\n",
"\n",
" try {\n",
" const stream = await navigator.mediaDevices.getUserMedia({ audio: true, video: false });\n",
"\n",
" audioContext = new AudioContext({ sampleRate: trainingConfig.sr });\n",
" console.log(`Requested AudioContext sample rate: ${trainingConfig.sr}, actual: ${audioContext.sampleRate}`);\n",
"\n",
" if (Math.abs(audioContext.sampleRate - trainingConfig.sr) > 100) {\n",
" statusElem.innerHTML += `<br><span style=\"color:orange;\">Warning: Browser sample rate (${audioContext.sampleRate}Hz) differs from training sample rate (${trainingConfig.sr}Hz). This might affect accuracy.</span>`;\n",
" }\n",
"\n",
" microphoneSource = audioContext.createMediaStreamSource(stream);\n",
"\n",
" mediaRecorder = new MediaRecorder(stream);\n",
" mediaRecorder.ondataavailable = (event) => {\n",
" if (event.data.size > 0) {\n",
" audioChunks.push(event.data);\n",
" }\n",
" };\n",
" mediaRecorder.onstop = () => {\n",
" const audioBlob = new Blob(audioChunks, { type: mediaRecorder.mimeType || 'audio/webm' });\n",
" audioBlobURL = URL.createObjectURL(audioBlob);\n",
" audioPlaybackElement.src = audioBlobURL;\n",
" replayButton.disabled = false;\n",
" console.log(\"Recording saved and ready for playback.\", audioBlobURL);\n",
" };\n",
" mediaRecorder.start();\n",
"\n",
" if (typeof Meyda === \"undefined\") {\n",
" throw new Error(\"Meyda.js library not found. Check the <script> tag.\");\n",
" }\n",
"\n",
" const meydaOptions = {\n",
" audioContext: audioContext,\n",
" source: microphoneSource,\n",
" bufferSize: MEYDA_BUFFER_SIZE,\n",
" featureExtractors: [\"mfcc\"],\n",
" callback: mfccCallback,\n",
" numberOfMFCCCoefficients: trainingConfig.n_mfcc,\n",
" };\n",
"\n",
" if (typeof Meyda.createMeydaAnalyzer === 'function') {\n",
" meydaAnalyzer = Meyda.createMeydaAnalyzer(meydaOptions);\n",
" } else if (typeof Meyda === 'function') {\n",
" meydaAnalyzer = new Meyda(meydaOptions);\n",
" } else {\n",
" throw new Error(\"Meyda is not a valid constructor nor does it have createMeydaAnalyzer.\");\n",
" }\n",
"\n",
" if (!meydaAnalyzer) {\n",
" throw new Error(\"MeydaAnalyzer initialization failed.\");\n",
" }\n",
"\n",
" meydaAnalyzer.start();\n",
" statusElem.textContent = `Recording started... wait for ${trainingConfig.max_pad_len_mfcc} frames.`;\n",
" const approxDuration = (trainingConfig.max_pad_len_mfcc * MEYDA_BUFFER_SIZE / audioContext.sampleRate).toFixed(1);\n",
" console.log(`Recording for approximately ${approxDuration} seconds.`);\n",
"\n",
" } catch (err) {\n",
" console.error(\"Error during recording startup or Meyda/MediaRecorder initialization:\", err);\n",
" statusElem.textContent = \"Microphone/setup error: \" + err.message;\n",
" if (err.name === \"NotAllowedError\" || err.name === \"PermissionDeniedError\" || err.name === \"SecurityError\") {\n",
" statusElem.textContent = \"Microphone permission denied or insecure context (HTTPS or localhost required).\";\n",
" } else if (err.name === \"NotFoundError\" || err.name === \"DevicesNotFoundError\") {\n",
" statusElem.textContent = \"No microphone found.\";\n",
" }\n",
" recordButton.disabled = false;\n",
" }\n",
" }\n",
"\n",
" function stopRecordingAndProcess() {\n",
" if (meydaAnalyzer) {\n",
" meydaAnalyzer.stop();\n",
" }\n",
" if (mediaRecorder && mediaRecorder.state === \"recording\") {\n",
" mediaRecorder.stop();\n",
" }\n",
" if (microphoneSource) {\n",
" if (microphoneSource.mediaStream && microphoneSource.mediaStream.getTracks) {\n",
" microphoneSource.mediaStream.getTracks().forEach(track => track.stop());\n",
" }\n",
" }\n",
" console.log(`Meyda analysis and audio recording complete. MFCC frames collected: ${collectedMfccFrames.length}`);\n",
" statusElem.textContent = 'Processing MFCC...';\n",
" processCollectedFeatures();\n",
" }\n",
"\n",
" function processCollectedFeatures() {\n",
" if (collectedMfccFrames.length === 0) {\n",
" statusElem.textContent = \"No MFCC data collected.\";\n",
" predictionResultElem.textContent = '';\n",
" recordButton.disabled = false;\n",
" return;\n",
" }\n",
"\n",
" let finalMfccs = [];\n",
" if (collectedMfccFrames.length >= trainingConfig.max_pad_len_mfcc) {\n",
" finalMfccs = collectedMfccFrames.slice(0, trainingConfig.max_pad_len_mfcc);\n",
" } else {\n",
" finalMfccs = [...collectedMfccFrames];\n",
" const padWidth = trainingConfig.max_pad_len_mfcc - collectedMfccFrames.length;\n",
" for (let i = 0; i < padWidth; i++) {\n",
" finalMfccs.push(new Array(trainingConfig.n_mfcc).fill(0));\n",
" }\n",
" }\n",
"\n",
" if (!mfccScalerParams || !mfccScalerParams.mean_ || !mfccScalerParams.scale_ ||\n",
" mfccScalerParams.mean_.length !== trainingConfig.n_mfcc ||\n",
" mfccScalerParams.scale_.length !== trainingConfig.n_mfcc) {\n",
" console.error(\"MFCC scaler parameters invalid or corrupt:\", mfccScalerParams, \"n_mfcc:\", trainingConfig.n_mfcc);\n",
" statusElem.textContent = \"Error: MFCC scaler parameters missing, corrupt, or incorrectly sized.\";\n",
" predictionResultElem.textContent = '';\n",
" recordButton.disabled = false;\n",
" return;\n",
" }\n",
"\n",
" const scaledMfccs = finalMfccs.map(frame =>\n",
" frame.map((value, index) =>\n",
" mfccScalerParams.scale_[index] !== 0 ?\n",
" (value - mfccScalerParams.mean_[index]) / mfccScalerParams.scale_[index] :\n",
" (value - mfccScalerParams.mean_[index])\n",
" )\n",
" );\n",
"\n",
" console.log(\"MFCCs processed and scaled, ready for tensor.\");\n",
"\n",
" const inputTensor = tf.tensor3d([scaledMfccs], [1, trainingConfig.max_pad_len_mfcc, trainingConfig.n_mfcc]);\n",
" console.log(\"Input tensor created. Shape:\", inputTensor.shape);\n",
"\n",
" makePrediction(inputTensor);\n",
" }\n",
"\n",
" async function makePrediction(inputTensor) {\n",
" if (!model) {\n",
" statusElem.textContent = 'TensorFlow.js model not yet loaded.';\n",
" predictionResultElem.textContent = '';\n",
" recordButton.disabled = false;\n",
" return;\n",
" }\n",
" statusElem.textContent = 'Prediction in progress...';\n",
" try {\n",
" const prediction = model.predict(inputTensor);\n",
" const predictionData = await prediction.data();\n",
"\n",
" tf.dispose(inputTensor);\n",
" tf.dispose(prediction);\n",
"\n",
" let maxProb = -1;\n",
" let predictedClassIndex = -1;\n",
" for (let i = 0; i < predictionData.length; i++) {\n",
" if (predictionData[i] > maxProb) {\n",
" maxProb = predictionData[i];\n",
" predictedClassIndex = i;\n",
" }\n",
" }\n",
"\n",
" if (predictedClassIndex !== -1 && labelClasses && labelClasses[predictedClassIndex]) {\n",
" const predictedLabel = labelClasses[predictedClassIndex];\n",
" predictionResultElem.textContent = `Predicted: ${predictedLabel} (Confidence: ${(maxProb * 100).toFixed(2)}%)`;\n",
" console.log(`Prediction: ${predictedLabel}, Class ID: ${predictedClassIndex}, Confidence: ${maxProb}`);\n",
" } else {\n",
" predictionResultElem.textContent = 'Error interpreting prediction or labels unavailable.';\n",
" console.error('Invalid prediction index or labelClasses undefined:', predictedClassIndex, labelClasses);\n",
" }\n",
" } catch (error) {\n",
" console.error(\"Error during prediction:\", error);\n",
" predictionResultElem.textContent = `Error during prediction: ${error.message}`;\n",
" } finally {\n",
" statusElem.textContent = 'Ready for a new recording.';\n",
" recordButton.disabled = false;\n",
" }\n",
" }\n",
"\n",
" recordButton.addEventListener('click', startRecording);\n",
"\n",
" replayButton.addEventListener('click', () => {\n",
" if (audioPlaybackElement.src && audioBlobURL) {\n",
" audioPlaybackElement.play();\n",
" } else {\n",
" console.log(\"No audio recording to play or invalid URL.\");\n",
" statusElem.textContent = \"No recording to play.\";\n",
" }\n",
" });\n",
" </script>\n",
"</body>\n",
"</html>\n",
"\"\"\"\n",
"\n",
"# Rimuovi commenti HTML ()\n",
"html_content = re.sub(r'', '', html_content, flags=re.DOTALL)\n",
"# Rimuovi commenti CSS e JS multi-linea (/* ... */)\n",
"html_content = re.sub(r'/\\*.*?\\*/', '', html_content, flags=re.DOTALL)\n",
"# Rimuovi commenti JS single-line (// ...)\n",
"html_content = re.sub(r'//.*', '', html_content)\n",
"# Rimuovi linee vuote risultanti dalla rimozione dei commenti (opzionale, per pulizia)\n",
"html_content = \"\\n\".join([line for line in html_content.splitlines() if line.strip()])\n",
"\n",
"\n",
"# Visualizza l'HTML nell'output della cella\n",
"display(HTML(html_content))\n",
"\n",
"print(\"\\n--- Instructions and Notes a web page to serve---\")\n",
"print(\"1. The HTML above has been rendered in the cell output (comments removed, text translated to English).\")\n",
"print(\"2. For the JavaScript to work correctly, the following files MUST be present in your Colab environment:\")\n",
"# Estrazione dinamica dei percorsi file per le istruzioni\n",
"# Assicurati che queste stringhe esistano ancora dopo la rimozione dei commenti e le traduzioni\n",
"try:\n",
" model_path_js = html_content.split('const MODEL_JSON_PATH = ')[1].split(';')[0].strip().strip(\"'\").strip('\"')\n",
" config_path_js = html_content.split('const CONFIG_PATH = ')[1].split(';')[0].strip().strip(\"'\").strip('\"')\n",
" scaler_path_js = html_content.split('const SCALER_PATH = ')[1].split(';')[0].strip().strip(\"'\").strip('\"')\n",
" labels_path_js = html_content.split('const LABELS_PATH = ')[1].split(';')[0].strip().strip(\"'\").strip('\"')\n",
" print(f\" - {model_path_js}\")\n",
" print(f\" - {config_path_js}\")\n",
" print(f\" - {scaler_path_js}\")\n",
" print(f\" - {labels_path_js}\")\n",
"except IndexError:\n",
" print(\" - ./tfjs_model/model.json (and associated shard files)\")\n",
" print(\" - ./training_config.json\")\n",
" print(\" - ./mfcc_scaler.json\")\n",
" print(\" - ./label_encoder_classes.json\")\n",
" print(\" (Could not dynamically parse all paths from the modified HTML, please verify them manually if needed)\")\n",
"\n",
"print(\" Upload them to the main directory (/content/) or create appropriate subdirectories (e.g., /content/tfjs_model/).\")\n",
"print(\"3. When you click 'Record and Recognize', your browser will ask for permission to use the microphone.\")\n",
"print(\"4. Make sure your browser's console (Developer Tools -> Console) is open to view logs and any JavaScript errors.\")\n",
"print(\"5. Microphone access requires a secure context (HTTPS or localhost). Colab is served via HTTPS, so it should work.\")"
]
}
],
"metadata": {
"colab": {
"provenance": [],
"gpuType": "V28",
"include_colab_link": true
},
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
},
"language_info": {
"name": "python"
},
"accelerator": "TPU"
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment