Last active
March 16, 2022 21:50
-
-
Save jboone/836db0cbc8c5cce45ba8a9849c822567 to your computer and use it in GitHub Desktop.
Goertzel-based DTMF detection.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import sys | |
import wave | |
import numpy | |
import scipy.signal | |
def detect_tone(x, f, fs, step_size): | |
# Apply Goertzel algorithm | |
# Use a window of a nice round multiple of the period of the frequency we're detecting. | |
n = 200 #int(round(fs / f * 16)) | |
# Calculate coefficients to multiply the sample window against | |
w0 = 2 * numpy.pi * f / fs | |
coeffs = numpy.exp( | |
numpy.arange(n, dtype=numpy.complex64) * w0 * -1j | |
) / n | |
# Output array | |
output_len = len(x) // step_size | |
result = numpy.zeros((output_len,), dtype=numpy.float32) | |
# Step through samples, taking a window each step and applying the Goertzel coefficients. | |
for i in range(output_len): | |
chunk = x[i*step_size:i*step_size+n] | |
if len(chunk) == n: | |
result[i] = numpy.abs(numpy.dot(coeffs, chunk)) | |
return result | |
# Expected WAV file sampling rate. | |
fs = 8000 | |
# Time resolution of our detections. We do a detection at every `step` samples. | |
step_size = 20 | |
# Read WAV file and check that code assumptions are true of the file. | |
wav = wave.open(sys.argv[1], mode='rb') | |
assert(wav.getparams()[:3] == (1, 2, fs)) | |
assert(wav.getparams()[4] == 'NONE') | |
frames = wav.getparams()[3] | |
# Convert the WAV int16s into floats scaled to +/-1 full-scale. | |
samples_bytes = wav.readframes(frames) | |
samples = numpy.frombuffer(samples_bytes, dtype=numpy.int16) | |
samples = numpy.array(samples, dtype=numpy.float32) | |
samples /= 32768 | |
# For each DTMF tone, do detection on the entire WAV file. | |
mag_map = {} | |
for f in (697, 770, 852, 941, 1209, 1336, 1477, 1633): | |
tone_mag = detect_tone(samples, f, fs, step_size) | |
assert(tone_mag.dtype == numpy.float32) | |
tone_mag.tofile('{:04d}.f32'.format(f)) | |
mag_map[f] = tone_mag | |
# Grab the lengths of each detection magnitude array. They should all be the same length. | |
mag_lengths = [len(d) for d in mag_map.values()] | |
mag_length = mag_lengths[0] | |
# Across all frequencies in the low DTMF band, get the maximum magnitude and normalize. | |
# Then build a [n][4] array where the first dimension is time (in steps) and the | |
# second dimension is the magnitudes of the four frequencies. | |
low_band_max = max([max(mag_map[f]) for f in (697, 770, 852, 941)]) | |
low_band = numpy.zeros((mag_length, 4), dtype=numpy.float32) | |
low_band[:,0] = mag_map[697] / low_band_max | |
low_band[:,1] = mag_map[770] / low_band_max | |
low_band[:,2] = mag_map[852] / low_band_max | |
low_band[:,3] = mag_map[941] / low_band_max | |
low_band.tofile('lo_x_4.f32') | |
# Do same for high band as for low band, above. | |
high_band_max = max([max(mag_map[f]) for f in (1209, 1336, 1477, 1633)]) | |
high_band = numpy.zeros((mag_length, 4), dtype=numpy.float32) | |
high_band[:,0] = mag_map[1209] / high_band_max | |
high_band[:,1] = mag_map[1336] / high_band_max | |
high_band[:,2] = mag_map[1477] / high_band_max | |
high_band[:,3] = mag_map[1633] / high_band_max | |
high_band.tofile('hi_x_4.f32') | |
# Table to map low and high frequency indices to DTMF characters. | |
dtmf_char_map = ( | |
('1', '4', '7', '*'), # column 1209 | |
('2', '5', '8', '0'), # column 1336 | |
('3', '6', '9', '#'), # column 1477 | |
('A', 'B', 'C', 'D'), # column 1633 | |
) | |
def detect_digits(samples, detect_threshold=0.25): | |
# Detection threshold: At least one frequency in the band must be above this | |
# threshold to be considered a valid DTMF character event. | |
# Walk through the time steps from the DTMF detection arrays. | |
raw = [] | |
detections = [] | |
dtmf_char_last = None | |
dtmf_char_time_start = 0 | |
for step, pair in enumerate(zip(low_band, high_band)): | |
time_samples = step * step_size | |
time_seconds = time_samples / fs | |
lo, hi = pair | |
# Identify strongest frequency in the band, and if that frequency crosses the threshold. | |
lo_max = max(lo) | |
lo_arg = numpy.argmax(lo) | |
lo_det = lo_max >= detect_threshold | |
hi_max = max(hi) | |
hi_arg = numpy.argmax(hi) | |
hi_det = hi_max >= detect_threshold | |
# If we detect a frequency in both the low and high bands, we can decode a DTMF character. | |
dtmf_char = dtmf_char_map[hi_arg][lo_arg] if lo_det and hi_det else None | |
if dtmf_char_last != dtmf_char: | |
duration = time_seconds - dtmf_char_time_start | |
detection = { | |
'timestamp': dtmf_char_time_start, | |
'duration': duration, | |
'character': dtmf_char_last, | |
} | |
detections.append(detection) | |
dtmf_char_last = dtmf_char | |
dtmf_char_time_start = time_seconds | |
raw.append(dtmf_char if dtmf_char is not None else '_') | |
# TODO: The last detected tone is not printed out. | |
return { | |
'raw': ''.join(raw), | |
'detections': detections, | |
} | |
def print_detections(detections, minimum_duration=0.0, output_breaks=True): | |
for detection in detections: | |
if output_breaks or (detection['character'] is not None): | |
if detection['duration'] >= minimum_duration: | |
dtmf_char_str = detection['character'] if detection['character'] is not None else ' ' | |
print('{:8.4f} {:8.4f} {:s}'.format(detection['timestamp'], detection['duration'], dtmf_char_str)) | |
detections_result = detect_digits(samples) | |
raw = detections_result['raw'] | |
print(raw) | |
detections = detections_result['detections'] | |
for minimum_duration in numpy.arange(0, 0.03, 0.0025): | |
chars = [detection['character'] for detection in detections if detection['duration'] >= minimum_duration and detection['character'] is not None] | |
chars_str = ''.join(chars) | |
print('>={:6.4f} {:s}'.format(minimum_duration, chars_str)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment