Skip to content

Instantly share code, notes, and snippets.

@glowinthedark
Last active May 1, 2026 12:51
Show Gist options
  • Select an option

  • Save glowinthedark/57ffa9e04d138fcbe352a6db2b8c0ecf to your computer and use it in GitHub Desktop.

Select an option

Save glowinthedark/57ffa9e04d138fcbe352a6db2b8c0ecf to your computer and use it in GitHub Desktop.
macOS TTS with Qwen3-TTS via MLX
#!/usr/bin/env python3
"""
TTS with Qwen3-TTS via MLX.
===========================
Requirements:
pip install mlx-audio soundfile numpy
pip install tqdm # optional, for progress bars
brew install ffmpeg
Supports two modes:
MODE 1 — Voice cloning (provide --ref_audio + --ref_text)
Uses the Base model. Language is inferred automatically from the text.
python tts_longform.py \
--input_file story.txt \
--ref_audio my_voice.m4a \
--ref_text "Exact words spoken in the clip." \
--output story.wav
MODE 2 — Custom voice (no --ref_audio / --ref_text)
Uses the CustomVoice model. Pass --language and --speaker, and optionally
a natural-language --instruct to shape tone/emotion/style.
python tts_longform.py \
--input_file story.txt \
--language English \
--speaker Chelsie \
--instruct "Calm and warm narrator." \
--output story.wav
CONVERT WAV
ffmpeg -y -i aug.mp3 \
-af "aformat=channel_layouts=mono,aresample=24000:resampler=soxr:dither_method=triangular,highpass=f=80,lowpass=f=8000,afftdn=nf=-23,acompressor=threshold=-21dB:ratio=2.5:attack=4:release=60:makeup=2" \
-ar 24000 -sample_fmt s16 -t 10 aug.wav
```sh
python -c "
from huggingface_hub import snapshot_download
snapshot_download(
repo_id='mlx-community/Qwen3-TTS-12Hz-1.7B-CustomVoice-8bit',
local_dir='./models/Qwen3-TTS-12Hz-1.7B-CustomVoice-8bit'
)
print('Done!')
"
```
NOTE: This is a python version of the functionality exposed via the CLI tool:
mlx_audio.tts.generate \
--model ~/models/Qwen3-TTS-12Hz-1.7B-Base-8bit \
--verbose \
--output_path "out-v2-$i" \
--join_audio \
--speed 1.0 \
--pitch 0.8 \
--audio_format wav \
--lang_code es \
--gender male \
--exaggeration 0 \
--ref_audio "sample.wav" \
--ref_text "The exact transcript text of audio in sample.wav"
"""
import argparse
import logging
import os
import re
import shutil
import subprocess
import sys
import tempfile
import time
import warnings
from pathlib import Path
# ── Suppress HuggingFace / tokenizer warnings ──────────────
# os.environ.setdefault("TOKENIZERS_PARALLELISM", "false")
# warnings.filterwarnings("ignore", message=".*qwen3_tts.*instantiate a model of type.*")
# warnings.filterwarnings("ignore", message=".*incorrect regex pattern.*")
# warnings.filterwarnings("ignore", message=".*fix_mistral_regex.*")
# logging.getLogger("transformers").setLevel(logging.ERROR)
# logging.getLogger("huggingface_hub").setLevel(logging.ERROR)
# ──────────────────────────────────────────────────────────────────────────────
import numpy as np
import soundfile as sf
import mlx.core as mx
from mlx_audio.tts.utils import load_model
try:
from tqdm import tqdm
_TQDM_AVAILABLE = True
except ImportError:
_TQDM_AVAILABLE = False
print(
"⚠ tqdm not found — install it for nicer progress bars:\n"
" pip install tqdm\n",
file=sys.stderr,
)
# ── Constants ──────────────────────────────────────────────────────────────────
SAMPLE_RATE = 24_000
CROSSFADE_MS = 25 # ms — short enough to be inaudible, long enough to kill clicks
SILENCE_MS = 250 # ms — natural breath pause between chunks
CHUNK_SIZE = 300 # chars — safe context window for Qwen3-TTS
MODEL_CLONING = "~/models/Qwen3-TTS-12Hz-1.7B-Base-8bit"
MODEL_CUSTOM = "~/models/Qwen3-TTS-12Hz-1.7B-CustomVoice-8bit"
# Formats mlx-audio cannot read natively — must be converted to WAV first
_NEEDS_CONVERSION = {".mp3", ".m4a", ".aac", ".flac", ".ogg", ".opus", ".wma", ".aiff", ".aif"}
VALID_LANGUAGES = {
"English", "Chinese", "Japanese", "Korean",
"German", "French", "Russian", "Portuguese", "Spanish", "Italian",
}
# ── Reference audio preparation ───────────────────────────────────────────────
def prepare_ref_audio(path: str) -> str:
"""
Return path unchanged if already a 24 kHz mono WAV.
Otherwise convert with ffmpeg to a temp WAV and return that path.
Raises RuntimeError if ffmpeg is not installed.
"""
import shutil, subprocess, tempfile
src = Path(path)
if not src.exists():
raise FileNotFoundError(f"Reference audio not found: {path}")
if src.suffix.lower() not in _NEEDS_CONVERSION:
return path # already WAV — pass through
# Check ffmpeg is available
if not shutil.which("ffmpeg"):
raise RuntimeError(
"ffmpeg is required to convert non-WAV reference audio but was not found.\n"
"Install it with: brew install ffmpeg"
)
tmp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False, prefix="ref_audio_")
tmp.close()
dst = tmp.name
cmd = [
"ffmpeg", "-y", "-i", str(src),
"-af", (
"aformat=channel_layouts=mono,"
"aresample=24000:resampler=soxr:dither_method=triangular,"
"highpass=f=80,lowpass=f=8000,"
"afftdn=nf=-23,"
"acompressor=threshold=-21dB:ratio=2.5:attack=4:release=60:makeup=2"
),
"-ar", "24000",
"-sample_fmt", "s16",
"-t", "10", # cap at 10 s — model only uses ~3–10 s anyway
dst,
]
print(f" Converting {src.name} → temporary 24 kHz mono WAV…")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(
f"ffmpeg conversion failed:\n{result.stderr.strip()}"
)
print(f" ✓ Conversion done ({dst})")
return dst
# ── Text splitting ─────────────────────────────────────────────────────────────
def split_text(text: str, max_chars: int = CHUNK_SIZE) -> list[str]:
"""
Split on sentence boundaries only, never mid-sentence.
Sentences are greedily grouped up to max_chars for natural prosody context.
Falls back to comma-boundary splitting for unusually long sentences.
"""
# Normalize whitespace
text = re.sub(r"\s+", " ", text.strip())
# Split on sentence-ending punctuation (., !, ?) followed by space or end
sentences = re.split(r"\n\n", text)
# !!!! ORIGINAL SPLIT PATTERN
# sentences = re.split(r"(?<=[.!?])\s+", text)
chunks, current = [], ""
for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue
if len(current) + len(sentence) + 1 <= max_chars:
current = (current + " " + sentence).strip()
else:
if current:
chunks.append(current)
# If a single sentence exceeds max_chars, split on comma boundaries
if len(sentence) > max_chars:
# Oversized sentence — split on commas
buf = ""
for part in re.split(r"(?<=,)\s+", sentence):
if len(buf) + len(part) + 1 <= max_chars:
buf = (buf + " " + part).strip()
else:
if buf:
chunks.append(buf)
buf = part
if buf:
chunks.append(buf)
current = ""
else:
current = sentence
if current:
chunks.append(current)
return [c for c in chunks if c.strip()]
# ── Audio utilities ────────────────────────────────────────────────────────────
def to_numpy(audio) -> np.ndarray:
"""Convert mlx.array or any array-like to a 1-D float32 numpy array."""
if isinstance(audio, mx.array):
arr = np.array(audio.tolist(), dtype=np.float32)
else:
arr = np.asarray(audio, dtype=np.float32)
return arr.squeeze()
def crossfade(a: np.ndarray, b: np.ndarray, samples: int) -> np.ndarray:
"""Linear crossfade the tail of `a` into the head of `b` to eliminate clicks."""
samples = min(samples, len(a), len(b))
if samples == 0:
return np.concatenate([a, b])
a = a.copy()
b = b.copy()
a[-samples:] *= np.linspace(1.0, 0.0, samples)
b[:samples] *= np.linspace(0.0, 1.0, samples)
return np.concatenate([a[:-samples], a[-samples:] + b[:samples], b[samples:]])
def join_segments(segments: list[np.ndarray]) -> np.ndarray:
"""
Join segments with crossfade + silence padding.
Shows a tqdm bar if available, plain numeric lines otherwise.
"""
cf_samples = int(SAMPLE_RATE * CROSSFADE_MS / 1000)
sil_samples = int(SAMPLE_RATE * SILENCE_MS / 1000)
silence = np.zeros(sil_samples, dtype=np.float32)
total = len(segments)
joined = segments[0].copy()
pending = segments[1:]
if not pending:
return joined
iterator = (
tqdm(pending, total=total - 1, desc=" Joining", unit="seg", ncols=72, colour="green")
if _TQDM_AVAILABLE else pending
)
for i, seg in enumerate(iterator, start=2):
if not _TQDM_AVAILABLE:
print(f" Joining segment {i}/{total}…", flush=True)
joined = crossfade(np.concatenate([joined, silence]), seg.copy(), cf_samples)
return joined
# ── Core TTS ──────────────────────────────────────────────────────────────────
def generate_segment_cloning(model, text: str, ref_audio: str, ref_text: str) -> np.ndarray:
"""Voice cloning — Base model, language inferred from text + reference audio."""
results = list(model.generate(text=text, ref_audio=ref_audio, ref_text=ref_text))
if not results:
raise RuntimeError(f"Model returned no audio for: {text!r}")
return to_numpy(results[0].audio)
def generate_segment_custom(
model, text: str, speaker: str, language: str, instruct: str
) -> np.ndarray:
"""Custom voice — CustomVoice model with named speaker + natural-language instruct."""
results = list(model.generate_custom_voice(
text=text,
speaker=speaker,
language=language,
instruct=instruct,
))
if not results:
raise RuntimeError(f"Model returned no audio for: {text!r}")
return to_numpy(results[0].audio)
# ── Orchestration ─────────────────────────────────────────────────────────────
def run(
text: str,
output: str,
# cloning mode
ref_audio: str | None = None,
ref_text: str | None = None,
# custom voice mode
language: str = "English",
speaker: str = "Chelsie",
instruct: str = "Speak naturally and clearly.",
# shared
model_path: str | None = None,
chunk_size: int = CHUNK_SIZE,
verbose: bool = True,
) -> None:
cloning_mode = ref_audio is not None
# Resolve model path: explicit override → sensible default per mode
resolved_model = model_path or (MODEL_CLONING if cloning_mode else MODEL_CUSTOM)
resolved_model = str(Path(resolved_model).expanduser())
chunks = split_text(text, max_chars=chunk_size)
total = len(chunks)
if verbose:
mode_label = "voice cloning" if cloning_mode else f"custom voice ({speaker}, {language})"
print(f"[tts_longform] Mode : {mode_label}")
print(f"[tts_longform] Model : {resolved_model}")
if cloning_mode:
print(f"[tts_longform] Ref : {ref_audio}")
print(f"[tts_longform] Chunks: {total}")
print(f"[tts_longform] Output: {output}\n")
if cloning_mode:
ref_audio = prepare_ref_audio(ref_audio)
print("Loading model…")
model = load_model(resolved_model)
segments: list[np.ndarray] = []
t0 = time.time()
iterator = (
tqdm(enumerate(chunks, 1), total=total, desc=" Generating",
unit="chunk", ncols=72, colour="cyan")
if _TQDM_AVAILABLE else enumerate(chunks, 1)
)
for i, chunk in iterator:
if verbose and not _TQDM_AVAILABLE:
print(f" [{i:>3}/{total}] {chunk[:60].replace(chr(10), ' ')!r}…", flush=True)
if cloning_mode:
seg = generate_segment_cloning(model, chunk, ref_audio, ref_text)
else:
seg = generate_segment_custom(model, chunk, speaker, language, instruct)
segments.append(seg)
print("\nJoining segments…")
audio = join_segments(segments)
# Normalise to −1 dBFS to avoid clipping on export
peak = np.max(np.abs(audio))
if peak > 0:
audio = audio / peak * 0.891 # ≈ −1 dBFS
out_path = Path(output)
out_path.parent.mkdir(parents=True, exist_ok=True)
sf.write(str(out_path), audio, SAMPLE_RATE, subtype="PCM_16")
elapsed = time.time() - t0
duration = len(audio) / SAMPLE_RATE
print(f"\n✓ Saved: {out_path}")
print(f" Duration : {duration:.1f}s")
print(f" Elapsed : {elapsed:.1f}s ({duration / elapsed:.2f}× real-time)")
# ── CLI ───────────────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(
description="Long-form TTS with Qwen3-TTS via MLX — voice cloning or custom voice.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
# Input text
src = parser.add_mutually_exclusive_group(required=True)
src.add_argument("--text", metavar="TEXT", help="Inline text to synthesise")
src.add_argument("--input_file", metavar="FILE", help="Path to a .txt file")
# Voice-cloning args (MODE 1)
clone = parser.add_argument_group("Voice cloning (MODE 1) — omit for custom voice mode")
clone.add_argument("--ref_audio", metavar="FILE",
help="Reference audio file (WAV, MP3, M4A, FLAC, …)")
clone.add_argument("--ref_text", metavar="TEXT|FILE",
help="Transcript of reference audio, or path to a .txt file")
# Custom-voice args (MODE 2)
custom = parser.add_argument_group("Custom voice (MODE 2) — used when no --ref_audio given")
custom.add_argument("--language", default="English",
metavar="|".join(sorted(VALID_LANGUAGES)),
help="Spoken language (default: English)")
custom.add_argument("--speaker", default="Chelsie",
metavar="NAME",
help="Built-in speaker name (default: Chelsie)")
custom.add_argument("--instruct", default="Speak naturally and clearly.",
metavar="PROMPT",
help='Natural-language style prompt, e.g. "Calm and warm narrator."')
# Shared args
parser.add_argument("--output", default="output.wav", metavar="FILE",
help="Output WAV file (default: output.wav)")
parser.add_argument("--model", default=None, metavar="PATH|REPO",
help="Override model path or HuggingFace repo ID")
parser.add_argument("--chunk_size", type=int, default=CHUNK_SIZE, metavar="N",
help=f"Max characters per synthesis chunk (default: {CHUNK_SIZE})")
parser.add_argument("--quiet", action="store_true", help="Suppress progress output")
args = parser.parse_args()
# ── Validate mode-specific requirements ───────────────────────────────────
cloning_mode = args.ref_audio is not None
if cloning_mode and args.ref_text is None:
parser.error("--ref_text is required when --ref_audio is provided")
if not cloning_mode and (args.ref_text is not None):
parser.error("--ref_text requires --ref_audio (voice cloning mode)")
if not cloning_mode and args.language not in VALID_LANGUAGES:
parser.error(
f"Unknown language {args.language!r}. "
f"Valid options: {', '.join(sorted(VALID_LANGUAGES))}"
)
# ── Resolve inputs ────────────────────────────────────────────────────────
text = Path(args.input_file).read_text(encoding="utf-8") if args.input_file else args.text
if args.ref_text and args.ref_text.endswith(".txt"):
args.ref_text = Path(args.ref_text).read_text(encoding="utf-8")
if not text.strip():
print("Error: input text is empty.", file=sys.stderr)
sys.exit(1)
run(
text=text,
output=args.output,
ref_audio=args.ref_audio,
ref_text=args.ref_text,
language=args.language,
speaker=args.speaker,
instruct=args.instruct,
model_path=args.model,
chunk_size=args.chunk_size,
verbose=not args.quiet,
)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment