|
""" |
|
MOSS-TTS-Nano — Generic TTS REST API Server |
|
============================================ |
|
|
|
Exposes three families of endpoints that together cover the widest range |
|
of automation clients (Home Assistant, Node-RED, shell scripts, etc.). |
|
|
|
Endpoints |
|
--------- |
|
GET /health |
|
Health / readiness check. |
|
|
|
GET /v1/voices |
|
List all available voice IDs (OpenAI-style envelope). |
|
|
|
POST /v1/audio/speech |
|
OpenAI TTS-compatible endpoint. |
|
Body (JSON): {"input": "...", "voice": "zh_1", "model": "...", "response_format": "wav"} |
|
Returns: raw WAV bytes Content-Type: audio/wav |
|
|
|
GET /tts?text=...&voice=... |
|
Simple single-URL GET — ideal for HA rest_command / media_player. |
|
Returns: raw WAV bytes Content-Type: audio/wav |
|
|
|
POST /tts |
|
Simple JSON POST. |
|
Body (JSON): {"text": "...", "voice": "zh_1"} |
|
Returns: raw WAV bytes Content-Type: audio/wav |
|
|
|
Voice IDs |
|
--------- |
|
Three sources are merged, checked in this priority order: |
|
|
|
1. *ONNX built-in voices* — pre-computed codes inside the model manifest. |
|
These require no reference audio file. IDs look like "Junhao", "Sarah", … |
|
(exact names depend on the downloaded model). |
|
|
|
2. *Demo preset voices* — reference WAV files from assets/demo.jsonl. |
|
ID = audio filename stem, e.g. "zh_1", "zh_6", "en_2", "jp_2", … |
|
|
|
3. *Custom voices* — any WAV / MP3 / FLAC file dropped into the |
|
``custom_voices/`` directory (configurable via --custom-voices-dir). |
|
ID = filename stem. Files are detected on every request — no restart |
|
needed after adding or replacing a file. |
|
|
|
If voice is omitted the first ONNX built-in voice is used. |
|
|
|
Adding a custom voice |
|
--------------------- |
|
Copy any reference audio file into custom_voices/:: |
|
|
|
cp my_speaker.wav custom_voices/alice.wav |
|
|
|
Then call the API with ``voice=alice``. The new voice is available |
|
immediately. |
|
|
|
Home Assistant integration examples |
|
------------------------------------ |
|
Option A — rest_command (GET): |
|
rest_command: |
|
moss_tts: |
|
url: "http://YOUR_DOCKER_HOST:18084/tts?text={{ text | urlencode }}&voice=zh_1" |
|
method: GET |
|
|
|
Option B — rest_command (POST, JSON): |
|
rest_command: |
|
moss_tts: |
|
url: "http://YOUR_DOCKER_HOST:18084/tts" |
|
method: POST |
|
headers: |
|
Content-Type: "application/json" |
|
payload: '{"text": "{{ text }}", "voice": "zh_1"}' |
|
|
|
Option C — OpenAI TTS integration (HA 2024.4+): |
|
Point the integration's base_url to http://YOUR_DOCKER_HOST:18084 |
|
(the /v1/audio/speech route is fully compatible). |
|
""" |
|
|
|
from __future__ import annotations |
|
|
|
import argparse |
|
import io |
|
import json |
|
import logging |
|
import subprocess |
|
import tempfile |
|
import threading |
|
import wave |
|
from pathlib import Path |
|
from typing import Any, Optional |
|
|
|
import numpy as np |
|
import uvicorn |
|
from fastapi import FastAPI, HTTPException, Query |
|
from fastapi.responses import JSONResponse, Response |
|
from pydantic import BaseModel |
|
|
|
from onnx_tts_runtime import ( |
|
OnnxTtsRuntime, |
|
ensure_browser_onnx_model_dir, |
|
) |
|
|
|
|
|
# --------------------------------------------------------------------------- |
|
# Request models (module-level — Pydantic v2 requires this for OpenAPI schema) |
|
# --------------------------------------------------------------------------- |
|
|
|
class SpeechRequest(BaseModel): |
|
"""OpenAI-compatible TTS request body.""" |
|
|
|
model: str = "moss-tts-nano" |
|
input: str |
|
voice: str = "" |
|
response_format: str = "wav" |
|
speed: float = 1.0 # accepted for compatibility; ignored |
|
seed: int | None = None |
|
sample_mode: str = "fixed" # "fixed" | "full" | "greedy" |
|
|
|
model_config = { |
|
"json_schema_extra": { |
|
"examples": [ |
|
{ |
|
"model": "moss-tts-nano", |
|
"input": "Hello, this is a test.", |
|
"voice": "zh_1", |
|
"response_format": "wav", |
|
"seed": 42, |
|
"sample_mode": "fixed", |
|
} |
|
] |
|
} |
|
} |
|
|
|
|
|
class TtsRequest(BaseModel): |
|
"""Simple TTS POST request body.""" |
|
|
|
text: str |
|
voice: str = "" |
|
seed: int | None = None |
|
sample_mode: str = "fixed" # "fixed" | "full" | "greedy" |
|
|
|
model_config = { |
|
"json_schema_extra": { |
|
"examples": [ |
|
{"text": "Hello, this is a test.", "voice": "zh_1", "seed": 42, "sample_mode": "fixed"}, |
|
{"text": "你好,这是一个测试。", "voice": "zh_1", "seed": 42, "sample_mode": "fixed"}, |
|
] |
|
} |
|
} |
|
|
|
APP_DIR = Path(__file__).resolve().parent |
|
DEMO_METADATA_PATH = APP_DIR / "assets" / "demo.jsonl" |
|
DEFAULT_CUSTOM_VOICES_DIR = APP_DIR / "custom_voices" |
|
|
|
# Audio file extensions recognised as voice samples. |
|
_AUDIO_EXTENSIONS = {".wav", ".mp3", ".flac", ".ogg", ".m4a"} |
|
|
|
# Extensions that libsndfile (soundfile) can read natively. |
|
# Everything else is pre-converted to WAV by ffmpeg before being passed to |
|
# torchaudio, which otherwise silently falls back to the soundfile backend |
|
# and fails on AAC / M4A / MP3 files. |
|
_SOUNDFILE_NATIVE_EXTENSIONS = {".wav", ".flac", ".aif", ".aiff", ".ogg"} |
|
# Maps the first word of a manifest "group" field to a BCP-47-style language tag. |
|
_GROUP_LANG_MAP: dict[str, str] = { |
|
"Chinese": "zh", |
|
"English": "en", |
|
"Japanese": "ja", |
|
"Korean": "ko", |
|
"French": "fr", |
|
"German": "de", |
|
"Spanish": "es", |
|
"Portuguese": "pt", |
|
} |
|
|
|
|
|
def _parse_voice_group(group: str) -> tuple[str | None, str | None]: |
|
"""Parse a manifest group string like 'Chinese Male' into (language_code, gender).""" |
|
parts = str(group or "").strip().split() |
|
language = _GROUP_LANG_MAP.get(parts[0]) if parts else None |
|
gender = parts[1].lower() if len(parts) >= 2 else None |
|
return language, gender |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
# --------------------------------------------------------------------------- |
|
# Audio helpers |
|
# --------------------------------------------------------------------------- |
|
|
|
def _waveform_to_wav_bytes(waveform: np.ndarray, sample_rate: int) -> bytes: |
|
"""Convert a float32 numpy waveform (samples × channels) to raw WAV bytes.""" |
|
audio = np.asarray(waveform, dtype=np.float32) |
|
if audio.ndim == 1: |
|
audio = audio.reshape(-1, 1) |
|
clipped = np.clip(audio, -1.0, 1.0) |
|
pcm16 = np.round(clipped * 32767.0).astype(np.int16) |
|
buf = io.BytesIO() |
|
with wave.open(buf, "wb") as wav_file: |
|
wav_file.setnchannels(int(pcm16.shape[1])) |
|
wav_file.setsampwidth(2) |
|
wav_file.setframerate(int(sample_rate)) |
|
wav_file.writeframes(pcm16.tobytes()) |
|
return buf.getvalue() |
|
|
|
|
|
# --------------------------------------------------------------------------- |
|
# Audio format conversion helper |
|
# --------------------------------------------------------------------------- |
|
|
|
def _to_wav_if_needed(path: str) -> tuple[str, bool]: |
|
"""Return (path_to_use, is_temp_file). |
|
|
|
If the file's extension is not natively readable by libsndfile, convert it |
|
to a temporary WAV file via ffmpeg and return the temp path (is_temp=True). |
|
The caller is responsible for deleting the temp file when done. |
|
""" |
|
if Path(path).suffix.lower() in _SOUNDFILE_NATIVE_EXTENSIONS: |
|
return path, False |
|
tmp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) |
|
tmp.close() |
|
try: |
|
subprocess.run( |
|
["ffmpeg", "-y", "-i", path, "-ar", "48000", "-ac", "2", |
|
"-sample_fmt", "s16", tmp.name], |
|
check=True, |
|
capture_output=True, |
|
) |
|
except FileNotFoundError: |
|
Path(tmp.name).unlink(missing_ok=True) |
|
raise RuntimeError( |
|
"ffmpeg not found. Install ffmpeg to support non-WAV audio formats." |
|
) |
|
except subprocess.CalledProcessError as exc: |
|
Path(tmp.name).unlink(missing_ok=True) |
|
raise RuntimeError( |
|
f"ffmpeg failed to convert '{Path(path).name}': " |
|
+ exc.stderr.decode(errors="replace").strip().splitlines()[-1] |
|
) from exc |
|
logger.debug("Converted '%s' to temp WAV: %s", path, tmp.name) |
|
return tmp.name, True |
|
|
|
|
|
# --------------------------------------------------------------------------- |
|
# Voice catalog helpers |
|
# --------------------------------------------------------------------------- |
|
|
|
def _load_demo_voices(demo_jsonl_path: Path) -> dict[str, Path]: |
|
"""Return a mapping of voice_id → absolute WAV path from assets/demo.jsonl. |
|
|
|
Voice ID = audio filename stem (e.g. "assets/audio/zh_1.wav" → "zh_1"). |
|
Only entries whose WAV file actually exists on disk are included. |
|
""" |
|
voices: dict[str, Path] = {} |
|
if not demo_jsonl_path.is_file(): |
|
logger.warning("demo.jsonl not found at %s — demo preset voices unavailable.", demo_jsonl_path) |
|
return voices |
|
with demo_jsonl_path.open(encoding="utf-8") as fh: |
|
for line in fh: |
|
line = line.strip() |
|
if not line: |
|
continue |
|
try: |
|
entry = json.loads(line) |
|
except json.JSONDecodeError: |
|
continue |
|
role = str(entry.get("role", "")).strip() |
|
if not role: |
|
continue |
|
wav_path = (APP_DIR / role).resolve() |
|
if wav_path.is_file(): |
|
voice_id = wav_path.stem # e.g. "zh_1", "en_6", "jp_2" |
|
# Don't overwrite if already present (first occurrence wins) |
|
voices.setdefault(voice_id, wav_path) |
|
return voices |
|
|
|
|
|
# --------------------------------------------------------------------------- |
|
# Custom voices helpers |
|
# --------------------------------------------------------------------------- |
|
|
|
def _scan_custom_voices(custom_voices_dir: Path) -> dict[str, Path]: |
|
"""Scan *custom_voices_dir* and return a mapping of voice_id → absolute path. |
|
|
|
Voice ID = file stem (e.g. ``alice.wav`` → ``alice``). |
|
Hidden files (name starting with ``.") and non-audio files are skipped. |
|
When two files share the same stem (different extensions) the first in |
|
alphabetical order wins. |
|
""" |
|
voices: dict[str, Path] = {} |
|
if not custom_voices_dir.is_dir(): |
|
return voices |
|
for entry in sorted(custom_voices_dir.iterdir()): |
|
if entry.name.startswith("."): |
|
continue |
|
if not entry.is_file(): |
|
continue |
|
if entry.suffix.lower() not in _AUDIO_EXTENSIONS: |
|
continue |
|
voices.setdefault(entry.stem, entry.resolve()) |
|
return voices |
|
|
|
|
|
# --------------------------------------------------------------------------- |
|
# FastAPI application factory |
|
# --------------------------------------------------------------------------- |
|
|
|
def create_app( |
|
*, |
|
model_dir: str | Path | None = None, |
|
custom_voices_dir: str | Path | None = None, |
|
cpu_threads: int = 4, |
|
max_new_frames: int = 375, |
|
default_voice: str = "", |
|
) -> FastAPI: |
|
"""Build and return the configured FastAPI application.""" |
|
|
|
# ── Model initialisation ──────────────────────────────────────────────── |
|
logger.info("Ensuring ONNX model assets are present …") |
|
resolved_model_dir = ensure_browser_onnx_model_dir(model_dir) |
|
logger.info("Loading ONNX runtime from %s …", resolved_model_dir) |
|
runtime = OnnxTtsRuntime( |
|
model_dir=resolved_model_dir, |
|
thread_count=max(1, int(cpu_threads)), |
|
max_new_frames=max_new_frames, |
|
) |
|
logger.info("ONNX runtime loaded.") |
|
|
|
# ── Voice catalogs ────────────────────────────────────────────────────── |
|
onnx_voices: list[dict[str, Any]] = runtime.list_builtin_voices() |
|
# Build a metadata dict: voice_id -> {display_name, language, gender, group} |
|
def _build_onnx_voice_meta() -> dict[str, dict[str, Any]]: |
|
meta: dict[str, dict[str, Any]] = {} |
|
for v in onnx_voices: |
|
vid = str(v["voice"]) |
|
lang, gender = _parse_voice_group(str(v.get("group", ""))) |
|
entry: dict[str, Any] = {} |
|
if v.get("display_name"): |
|
entry["display_name"] = str(v["display_name"]) |
|
if v.get("group"): |
|
entry["group"] = str(v["group"]) |
|
if lang: |
|
entry["language"] = lang |
|
if gender: |
|
entry["gender"] = gender |
|
meta[vid] = entry |
|
return meta |
|
onnx_voice_meta: dict[str, dict[str, Any]] = _build_onnx_voice_meta() |
|
onnx_voice_ids: set[str] = set(onnx_voice_meta) |
|
demo_voices: dict[str, Path] = _load_demo_voices(DEMO_METADATA_PATH) |
|
|
|
# Custom voices directory — scanned live on every request. |
|
_custom_voices_dir = Path(custom_voices_dir or DEFAULT_CUSTOM_VOICES_DIR).expanduser().resolve() |
|
_custom_voices_dir.mkdir(parents=True, exist_ok=True) |
|
logger.info("Custom voices directory: %s", _custom_voices_dir) |
|
|
|
_initial_custom = _scan_custom_voices(_custom_voices_dir) |
|
all_voice_ids: set[str] = onnx_voice_ids | set(demo_voices.keys()) | set(_initial_custom.keys()) |
|
logger.info( |
|
"Voice catalog: %d ONNX built-in, %d demo preset, %d custom. Total: %d.", |
|
len(onnx_voice_ids), |
|
len(demo_voices), |
|
len(_initial_custom), |
|
len(all_voice_ids), |
|
) |
|
|
|
# ── Default voice ─────────────────────────────────────────────────────── |
|
_default_voice = str(default_voice or "").strip() |
|
if not _default_voice: |
|
_default_voice = str(onnx_voices[0]["voice"]) if onnx_voices else "" |
|
logger.info("Default voice: %s", _default_voice or "(none)") |
|
|
|
# ── Concurrency lock ──────────────────────────────────────────────────── |
|
# The ONNX runtime is not designed for concurrent access; serialise calls. |
|
_inference_lock = threading.Lock() |
|
|
|
app = FastAPI( |
|
title="MOSS-TTS-Nano API", |
|
description=( |
|
"Generic TTS REST API backed by **MOSS-TTS-Nano** (ONNX CPU backend).\n\n" |
|
"## Audio responses\n" |
|
"All TTS endpoints return raw **WAV** audio (`audio/wav`). " |
|
"In Swagger UI click **Execute** → the response body will appear as a " |
|
"downloadable file link.\n\n" |
|
"## Voice IDs\n" |
|
"Call `GET /v1/voices` to list all available voice IDs. Three sources " |
|
"are merged:\n" |
|
"- **onnx_builtin** — pre-baked voices in the ONNX model manifest\n" |
|
"- **demo_preset** — reference WAV files from `assets/demo.jsonl`\n" |
|
"- **custom** — any audio file dropped into `custom_voices/` " |
|
"(live-scanned, no restart required)\n\n" |
|
"## Quick test\n" |
|
"```\nGET /tts?text=Hello+world\n```" |
|
), |
|
version="1.0.0", |
|
openapi_tags=[ |
|
{"name": "info", "description": "Health and voice catalog."}, |
|
{"name": "openai", "description": "OpenAI-compatible `/v1/audio/speech` endpoint."}, |
|
{"name": "simple", "description": "Lightweight GET / POST endpoints for automation clients."}, |
|
], |
|
) |
|
|
|
# ── Voice resolution helper ───────────────────────────────────────────── |
|
def _resolve_voice(voice_id: str) -> tuple[str | None, str | None]: |
|
"""Resolve a voice_id to (onnx_voice, prompt_audio_path). |
|
|
|
Priority: |
|
1. Exact match in ONNX built-in voices → onnx_voice, no audio file |
|
2. Exact match in demo preset voices → prompt_audio_path |
|
3. Exact match in custom voices dir → prompt_audio_path (live scan) |
|
4. Fall back to default voice |
|
5. Raise HTTP 400 if nothing resolves |
|
""" |
|
vid = str(voice_id or "").strip() or _default_voice |
|
if vid in onnx_voice_ids: |
|
return vid, None |
|
if vid in demo_voices: |
|
return None, str(demo_voices[vid]) |
|
# Live scan — picks up files added after startup without a restart. |
|
custom_voices = _scan_custom_voices(_custom_voices_dir) |
|
if vid in custom_voices: |
|
logger.info("Using custom voice '%s' from %s", vid, custom_voices[vid]) |
|
return None, str(custom_voices[vid]) |
|
if vid and vid != _default_voice: |
|
logger.warning("Voice '%s' not found; falling back to default '%s'.", vid, _default_voice) |
|
return _resolve_voice(_default_voice) |
|
raise HTTPException( |
|
status_code=400, |
|
detail=( |
|
f"Voice '{vid}' not found. " |
|
"Call GET /v1/voices for the list of available IDs." |
|
), |
|
) |
|
|
|
# ── Core synthesis helper ─────────────────────────────────────────────── |
|
def _synthesize( |
|
text: str, |
|
voice_id: str, |
|
seed: int | None = None, |
|
sample_mode: str = "fixed", |
|
) -> bytes: |
|
text = str(text or "").strip() |
|
if not text: |
|
raise HTTPException(status_code=400, detail="Text must not be empty.") |
|
resolved_voice, prompt_audio_path = _resolve_voice(voice_id) |
|
|
|
# Convert audio to WAV if soundfile can't handle the source format |
|
# (e.g. m4a/AAC from iPhone, mp3, etc.). |
|
tmp_wav: str | None = None |
|
if prompt_audio_path is not None: |
|
try: |
|
prompt_audio_path, is_temp = _to_wav_if_needed(prompt_audio_path) |
|
except RuntimeError as exc: |
|
raise HTTPException(status_code=422, detail=str(exc)) from exc |
|
if is_temp: |
|
tmp_wav = prompt_audio_path |
|
|
|
try: |
|
with _inference_lock: |
|
result = runtime.synthesize( |
|
text=text, |
|
voice=resolved_voice, |
|
prompt_audio_path=prompt_audio_path, |
|
enable_wetext=False, |
|
enable_normalize_tts_text=True, |
|
seed=seed, |
|
sample_mode=sample_mode, |
|
) |
|
finally: |
|
if tmp_wav is not None: |
|
Path(tmp_wav).unlink(missing_ok=True) |
|
|
|
return _waveform_to_wav_bytes( |
|
np.asarray(result["waveform"], dtype=np.float32), |
|
int(result["sample_rate"]), |
|
) |
|
|
|
# ── /health ───────────────────────────────────────────────────────────── |
|
@app.get("/health", tags=["info"], summary="Service health / readiness check") |
|
def health(): |
|
custom_voices = _scan_custom_voices(_custom_voices_dir) |
|
return { |
|
"status": "ok", |
|
"model": "moss-tts-nano", |
|
"backend": "onnx", |
|
"default_voice": _default_voice, |
|
"builtin_voices": len(onnx_voice_ids), |
|
"demo_preset_voices": len(demo_voices), |
|
"custom_voices": len(custom_voices), |
|
"custom_voices_dir": str(_custom_voices_dir), |
|
} |
|
|
|
# ── /v1/voices ─────────────────────────────────────────────────────────── |
|
@app.get( |
|
"/v1/voices", |
|
tags=["info"], |
|
summary="List available voice IDs", |
|
description=( |
|
"Returns all voice IDs from three sources: ONNX built-in, demo presets, " |
|
"and any files currently present in `custom_voices/` (live scan)." |
|
), |
|
) |
|
def list_voices(): |
|
custom_voices = _scan_custom_voices(_custom_voices_dir) |
|
data = [] |
|
for vid in sorted(onnx_voice_ids): |
|
entry = {"id": vid, "object": "voice", "source": "onnx_builtin"} |
|
entry.update(onnx_voice_meta.get(vid, {})) |
|
data.append(entry) |
|
for vid in sorted(demo_voices): |
|
data.append({"id": vid, "object": "voice", "source": "demo_preset"}) |
|
for vid, path in sorted(custom_voices.items()): |
|
data.append({"id": vid, "object": "voice", "source": "custom", "file": path.name}) |
|
return {"object": "list", "data": data} |
|
|
|
# ── POST /v1/audio/speech (OpenAI-compatible) ─────────────────────────── |
|
@app.post( |
|
"/v1/audio/speech", |
|
tags=["openai"], |
|
summary="OpenAI-compatible TTS endpoint", |
|
description=( |
|
"Drop-in replacement for the OpenAI `/v1/audio/speech` endpoint. " |
|
"Returns a raw WAV file. " |
|
"Set `voice` to any ID from `GET /v1/voices`; omit to use the default voice." |
|
), |
|
response_class=Response, |
|
responses={ |
|
200: { |
|
"content": {"audio/wav": {}}, |
|
"description": "WAV audio file (48 kHz stereo).", |
|
} |
|
}, |
|
) |
|
def speech(req: SpeechRequest): |
|
if req.response_format not in {"wav", "pcm", ""}: |
|
raise HTTPException( |
|
status_code=400, |
|
detail=( |
|
f"Unsupported response_format '{req.response_format}'. " |
|
"Only 'wav' is currently supported." |
|
), |
|
) |
|
wav_bytes = _synthesize(req.input, req.voice, seed=req.seed, sample_mode=req.sample_mode) |
|
return Response(content=wav_bytes, media_type="audio/wav") |
|
|
|
# ── GET /tts (simple GET for HA / shell / URL players) ───────────────── |
|
@app.get( |
|
"/tts", |
|
tags=["simple"], |
|
summary="Simple GET TTS — returns WAV audio", |
|
description=( |
|
"Synthesize speech via query parameters. " |
|
"Ideal for Home Assistant `rest_command`, URL-based media players, and shell scripts. " |
|
"Example: `/tts?text=Hello+world&voice=zh_1`" |
|
), |
|
response_class=Response, |
|
responses={ |
|
200: { |
|
"content": {"audio/wav": {}}, |
|
"description": "WAV audio file (48 kHz stereo).", |
|
} |
|
}, |
|
) |
|
def tts_get( |
|
text: str = Query( |
|
..., |
|
description="Text to synthesize.", |
|
openapi_examples={ |
|
"english": {"summary": "English", "value": "Hello, this is a test."}, |
|
"chinese": {"summary": "Chinese", "value": "你好,这是一个测试。"}, |
|
}, |
|
), |
|
voice: str = Query( |
|
"", |
|
description="Voice ID. Leave blank to use the default voice. Call `GET /v1/voices` for the full list.", |
|
), |
|
seed: int | None = Query( |
|
None, |
|
description="Random seed for deterministic output. Omit for random generation. Same seed + same inputs always produce the same audio.", |
|
), |
|
sample_mode: str = Query( |
|
"fixed", |
|
description="Sampling mode: `fixed` (default, fast), `full` (uses temperature/top-p/top-k), `greedy` (fully deterministic, no randomness).", |
|
), |
|
): |
|
wav_bytes = _synthesize(text, voice, seed=seed, sample_mode=sample_mode) |
|
return Response(content=wav_bytes, media_type="audio/wav") |
|
|
|
# ── POST /tts (JSON body) ─────────────────────────────────────────────── |
|
@app.post( |
|
"/tts", |
|
tags=["simple"], |
|
summary="Simple POST TTS — returns WAV audio", |
|
description="Synthesize speech from a JSON body. Returns raw WAV bytes.", |
|
response_class=Response, |
|
responses={ |
|
200: { |
|
"content": {"audio/wav": {}}, |
|
"description": "WAV audio file (48 kHz stereo).", |
|
} |
|
}, |
|
) |
|
def tts_post(req: TtsRequest): |
|
wav_bytes = _synthesize(req.text, req.voice, seed=req.seed, sample_mode=req.sample_mode) |
|
return Response(content=wav_bytes, media_type="audio/wav") |
|
|
|
return app |
|
|
|
|
|
# --------------------------------------------------------------------------- |
|
# Entry point |
|
# --------------------------------------------------------------------------- |
|
|
|
def main() -> None: |
|
parser = argparse.ArgumentParser( |
|
description="MOSS-TTS-Nano Generic TTS API Server", |
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter, |
|
) |
|
parser.add_argument( |
|
"--model-dir", |
|
default=None, |
|
help="ONNX model directory. Auto-downloads to ./models if omitted.", |
|
) |
|
parser.add_argument("--host", default="0.0.0.0", help="Bind host.") |
|
parser.add_argument("--port", type=int, default=18084, help="Bind port.") |
|
parser.add_argument( |
|
"--cpu-threads", |
|
type=int, |
|
default=4, |
|
help="ONNX runtime intra-op thread count.", |
|
) |
|
parser.add_argument( |
|
"--max-new-frames", |
|
type=int, |
|
default=375, |
|
help="Maximum generated audio frames per synthesis call.", |
|
) |
|
parser.add_argument( |
|
"--default-voice", |
|
default="", |
|
help="Default voice ID when the caller omits the voice parameter.", |
|
) |
|
parser.add_argument( |
|
"--custom-voices-dir", |
|
default=str(DEFAULT_CUSTOM_VOICES_DIR), |
|
help=( |
|
"Directory to scan for custom voice sample files (WAV/MP3/FLAC/OGG/M4A). " |
|
"Files added here are available immediately without a restart. " |
|
f"Default: {DEFAULT_CUSTOM_VOICES_DIR}" |
|
), |
|
) |
|
args = parser.parse_args() |
|
|
|
logging.basicConfig( |
|
format="%(asctime)s %(levelname)s %(name)s: %(message)s", |
|
level=logging.INFO, |
|
) |
|
|
|
app = create_app( |
|
model_dir=args.model_dir, |
|
custom_voices_dir=args.custom_voices_dir, |
|
cpu_threads=args.cpu_threads, |
|
max_new_frames=args.max_new_frames, |
|
default_voice=args.default_voice, |
|
) |
|
|
|
uvicorn.run(app, host=args.host, port=args.port, log_level="info") |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |