Skip to content

Instantly share code, notes, and snippets.

@halcy
Last active February 11, 2025 19:08
Show Gist options
  • Save halcy/7fb41ae9457f25eb5d67fa0d2ee57aaf to your computer and use it in GitHub Desktop.
Save halcy/7fb41ae9457f25eb5d67fa0d2ee57aaf to your computer and use it in GitHub Desktop.
import pyaudio
import numpy as np
import time
import queue
import threading
from collections import deque
import os
import signal
# Frequency range to check for peaks.
# No sense checking outside this range for 50Hz systems, if it's far lower
# or higher the system is in Huge Trouble™.
FREQ_MIN = 49.4
FREQ_MAX = 50.6
# Window for the FFT, in seconds. Using a pretty large window to get good freq resolution.
FFT_WINDOW_SECONDS = 25
UPDATE_DELAY = 0.5
# Log writing interval
LOG_INTERVAL_SEC = 60.0
# Basic audio recorder
class AudioRecorder:
def __init__(self, format_is_float=True):
self.queue = queue.Queue()
self.overflow_flag = False
self.format_is_float = format_is_float
self.rolling_buffer = None
self.rolling_buffer_lock = None
self.stop_event = threading.Event()
self.record_thread = None
self.stream = None
def audio_callback(self, in_data, _, __, status_flags):
if status_flags & pyaudio.paInputOverflow:
self.overflow_flag = True
self.queue.put(in_data)
return (None, pyaudio.paContinue)
def read_samples(self):
if self.queue.empty():
return None
data_list = []
while not self.queue.empty():
data_list.append(self.queue.get())
raw_data = b"".join(data_list)
if self.format_is_float:
samples = np.frombuffer(raw_data, dtype=np.float32)
else:
samples = np.frombuffer(
raw_data, dtype=np.int16).astype(np.float32)
return samples
def stop(self):
if self.record_thread is not None:
self.stop_event.set()
self.record_thread.join(timeout=1.0)
self.record_thread = None
if self.stream is not None:
self.stream.stop_stream()
self.stream.close()
self.stream = None
def stopping(self):
return self.stop_event.is_set()
def is_overflowing(self):
return self.overflow_flag
def clear_overflow(self):
self.overflow_flag = False
@staticmethod
def list_input_devices(pa):
device_count = pa.get_device_count()
info_list = []
print("Available input devices:")
for i in range(device_count):
dev_info = pa.get_device_info_by_index(i)
max_in = dev_info.get('maxInputChannels', 0)
if max_in > 0:
name = dev_info.get('name', f"Device {i}")
host_api_idx = dev_info.get('hostApi')
host_api_info = pa.get_host_api_info_by_index(host_api_idx)
api_name = host_api_info.get("name", f"API {host_api_idx}")
print(
f" Index {i}: {name} | Host API: {api_name} | Input Channels: {max_in}")
info_list.append((i, name))
return info_list
@staticmethod
def pick_best_supported_samplerate(pa, device_index, lowest=False):
COMMON_SAMPLE_RATES = [
192000, 176400, 128000, 96000, 88200, 64000,
48000, 44100, 32000, 22050, 16000, 11025, 8000
]
if lowest:
COMMON_SAMPLE_RATES.reverse()
for sr in COMMON_SAMPLE_RATES:
try:
test_stream = pa.open(
format=pyaudio.paFloat32,
channels=1,
rate=sr,
input=True,
frames_per_buffer=1024,
input_device_index=device_index
)
test_stream.close()
return sr
except Exception:
pass
return None
def record_thread_main(self):
while not self.stop_event.is_set():
time.sleep(0.01)
# Reset buffer on overflow
if self.overflow_flag:
self.overflow_flag = False
with self.rolling_buffer_lock:
self.rolling_buffer.clear()
# Read queued samples
new_samples = self.read_samples()
if new_samples is not None and len(new_samples) > 0:
with self.rolling_buffer_lock:
self.rolling_buffer.extend(new_samples)
def get_samples(self, n_samples):
with self.rolling_buffer_lock:
if len(self.rolling_buffer) < n_samples:
return None
return np.array(self.rolling_buffer, dtype=np.float32)[-n_samples:]
def start(self, pa, device_index, sample_rate, buffer_size_seconds, pa_frame_size=1024):
if self.record_thread is not None:
return
self.overflow_flag = False
self.rolling_buffer_lock = threading.Lock()
self.rolling_buffer = deque(maxlen=sample_rate * buffer_size_seconds)
self.stream = pa.open(
format=pyaudio.paFloat32 if self.format_is_float else pyaudio.paInt16,
channels=1,
rate=sample_rate,
input=True,
frames_per_buffer=pa_frame_size,
input_device_index=device_index,
stream_callback=self.audio_callback
)
self.stream.start_stream()
self.record_thread = threading.Thread(
target=self.record_thread_main, daemon=True)
self.record_thread.start()
def main():
pa = pyaudio.PyAudio()
# List devices
devices = AudioRecorder.list_input_devices(pa)
print("\nSelect input device index: ")
device_index = int(input().strip())
# Pick lowest sample rate (for speed, we don't need high sample rates to look at 50Hz)
sample_rate = AudioRecorder.pick_best_supported_samplerate(
pa, device_index, lowest=True)
if sample_rate is None:
print("No supported sample rates found.")
pa.terminate()
return
fft_window_size = int(sample_rate * FFT_WINDOW_SECONDS)
# Attempt to start recording audio
try:
recorder = AudioRecorder(format_is_float=True)
recorder.start(pa, device_index, sample_rate, FFT_WINDOW_SECONDS)
except:
print("Failed to open device.")
pa.terminate()
return
# Install a signal handler to stop everything on ctrl+c
def signal_handler(_, __):
print("\nCtrl+C pressed. Stopping threads...")
recorder.stop()
signal.signal(signal.SIGINT, signal_handler)
# Prepare for logging
os.makedirs("logs", exist_ok=True)
data_log = []
# Main loop
last_update_time = time.time()
while not recorder.stopping():
time.sleep(0.01)
now = time.time()
# Print update only every UPDATE_DELAY seconds
if (now - last_update_time) < UPDATE_DELAY:
continue
# Check if we have enough data for a full 25s window
window_samples = recorder.get_samples(fft_window_size)
if window_samples is None or len(window_samples) < fft_window_size:
continue
# Do a Hann windowed FFT
hann_window = np.hanning(fft_window_size)
windowed = window_samples * hann_window
fft_complex = np.fft.rfft(windowed)
# Slice the magnitude spectrogram to the range we care about
freqs = np.fft.rfftfreq(fft_window_size, 1.0 / sample_rate)
idx_min = np.searchsorted(freqs, FREQ_MIN)
idx_max = np.searchsorted(freqs, FREQ_MAX)
sub_mags = np.abs(fft_complex[idx_min:idx_max])
sub_freqs = freqs[idx_min:idx_max]
# Find the peak
n = np.argmax(sub_mags)
# Quadratic interpolation to refine the peak location
frac_offset = 0.0
if not (n < 1 or n >= len(sub_mags) - 1):
alpha = sub_mags[n - 1]
beta = sub_mags[n]
gamma = sub_mags[n + 1]
denom = alpha - 2*beta + gamma
if abs(denom) > 1e-12:
frac_offset = 0.5 * (alpha - gamma) / denom
n_interp = n + frac_offset
# Compute which frequency this corresponds to
lower_idx = int(np.floor(n_interp))
upper_idx = int(np.ceil(n_interp))
frac = n_interp - lower_idx
if upper_idx >= len(sub_freqs):
upper_idx = len(sub_freqs) - 1
est_freq = (1 - frac) * \
sub_freqs[lower_idx] + frac * sub_freqs[upper_idx]
# Log and print
measurement_time = time.time()
data_log.append((measurement_time, est_freq))
print(f"Freq: {est_freq:7.3f} Hz")
last_update_time = now
# Flush log if we need to
if len(data_log) > 1:
earliest_ts = data_log[0][0]
if (measurement_time - earliest_ts) >= LOG_INTERVAL_SEC:
fname = f"logs/{int(measurement_time)}.npy"
print(f"** Saving {len(data_log)} data points to {fname} **")
np.save(fname, data_log)
data_log.clear()
# Cleanup
pa.terminate()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment