Created
May 24, 2025 12:11
-
-
Save ingoogni/938806134951545cbfe6dfbfd3c730c6 to your computer and use it in GitHub Desktop.
Goertzel generalized filter & filter bank
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import strutils | |
import math | |
import complex | |
const | |
SampleRate = 8000.0 # 8kHz | |
TargetFrequency = 941.0 # 941 Hz | |
FFTSize = 205 | |
N = FFTSize # Block size | |
type | |
Sample = uint8 | |
# Generalized Goertzel filter | |
GeneralizedGoertzelFilter = object | |
frequency: float | |
omega: float # 2*pi*frequencyIndex/N | |
coeff: float # 2*cos(omega) | |
complexConstant: Complex64 # exp(-i*omega) | |
s0, s1, s2: float # State variables | |
sampleCount: int # number of processed samples | |
proc initGeneralizedGoertzel(frequency: float): GeneralizedGoertzelFilter = | |
let | |
floatN = float(N) | |
frequencyIndex = frequency * floatN / SampleRate # Can be non-integer | |
omega = 2.0 * Pi * frequencyIndex / floatN | |
return GeneralizedGoertzelFilter( | |
frequency: frequency, | |
omega: omega, | |
coeff: 2.0 * cos(omega), | |
complexConstant: complex64(cos(-omega), sin(-omega)), # exp(-i*omega) | |
s0: 0.0, | |
s1: 0.0, | |
s2: 0.0, | |
sampleCount: 0 | |
) | |
proc resetGeneralizedGoertzel(gf: var GeneralizedGoertzelFilter) = | |
## Reset the filter before processing a new block | |
gf.s0 = 0.0 | |
gf.s1 = 0.0 | |
gf.s2 = 0.0 | |
gf.sampleCount = 0 | |
proc processSampleGeneralized(gf: var GeneralizedGoertzelFilter, sample: Sample) = | |
gf.sampleCount += 1 | |
# For all but the last sample, update state variables | |
if gf.sampleCount < N: | |
gf.s0 = float(sample) + gf.coeff * gf.s1 - gf.s2 | |
gf.s2 = gf.s1 | |
gf.s1 = gf.s0 | |
else: | |
# Process the last sample differently | |
gf.s0 = float(sample) + gf.coeff * gf.s1 - gf.s2 | |
proc getComplexResult(gf: GeneralizedGoertzelFilter): Complex64 = | |
# Direct calculation from the state variables | |
result = complex64(gf.s0, 0.0) - complex64(gf.s1 * gf.complexConstant.re, gf.s1 * gf.complexConstant.im) | |
# Phase correction for non-integer frequency indices | |
let phaseCorrection = complex64( | |
cos(-gf.omega * float(N-1)), | |
sin(-gf.omega * float(N-1)) | |
) | |
result = complex64( | |
result.re * phaseCorrection.re - result.im * phaseCorrection.im, | |
result.re * phaseCorrection.im + result.im * phaseCorrection.re | |
) | |
return result | |
proc processBlockGeneralized(gf: var GeneralizedGoertzelFilter, samples: openarray[Sample]): Complex64 = | |
gf.resetGeneralizedGoertzel() | |
for i in 0..<(samples.len-1): | |
gf.sampleCount += 1 | |
gf.s0 = float(samples[i]) + gf.coeff * gf.s1 - gf.s2 | |
gf.s2 = gf.s1 | |
gf.s1 = gf.s0 | |
# Process the last sample | |
gf.sampleCount += 1 | |
gf.s0 = float(samples[samples.len-1]) + gf.coeff * gf.s1 - gf.s2 | |
return gf.getComplexResult() | |
# Process multiple frequencies "at once" | |
proc goertzelGeneral(samples: openarray[Sample], frequencies: openarray[float]): seq[Complex64] = | |
result = newSeq[Complex64](frequencies.len) | |
for i, freq in frequencies: | |
var gf = initGeneralizedGoertzel(freq) | |
result[i] = gf.processBlockGeneralized(samples) | |
#======= | |
proc generate(testData: var openarray[Sample], frequency: float) = | |
let step = frequency * ((2.0 * Pi) / SampleRate) | |
for index in 0..<N: | |
testData[index] = Sample(100.0 * sin(float(index) * step) + 100.0) | |
proc demoGeneralizedGoertzel() = | |
echo "Test Generalized Goertzel algorithm:" | |
# Create test frequencies with non-integer indices | |
let testFreqs = [ | |
TargetFrequency - 250.5, # Non-integer offset | |
TargetFrequency, | |
TargetFrequency + 250.5 # Non-integer offset | |
] | |
var testData: array[N, Sample] | |
# Test each frequency | |
for freq in testFreqs: | |
echo "Test frequency: ", freq | |
# Generate test data | |
testData.generate(freq) | |
# Process with generalized Goertzel | |
var gf = initGeneralizedGoertzel(freq) | |
let complexResult = gf.processBlockGeneralized(testData) | |
# Calculate magnitude | |
let | |
magnitude = sqrt(complexResult.re * complexResult.re + complexResult.im * complexResult.im) | |
phase = arctan2(complexResult.im, complexResult.re) | |
echo " Complex result: re=", complexResult.re, " im=", complexResult.im | |
echo " Magnitude: ", magnitude | |
echo " Phase: ", phase, " radians" | |
echo "" | |
# Demo to sweep through a range of frequencies including non-integer indices | |
proc frequencySweepDemo() = | |
echo "Frequency Sweep with Generalized Goertzel:" | |
var testData: array[N, Sample] | |
# Create a range of frequencies to test | |
var frequencies = newSeq[float]() | |
var freq = TargetFrequency - 300.0 | |
while freq <= TargetFrequency + 300.0: | |
frequencies.add(freq) | |
freq += 15.25 # Non-integer step | |
# Generate test data at the target frequency | |
testData.generate(TargetFrequency) | |
# Process all frequencies at once | |
let results = goertzelGeneral(testData, frequencies) | |
# Display results | |
for i in 0..<frequencies.len: | |
let | |
magnitude = sqrt(results[i].re * results[i].re + results[i].im * results[i].im) | |
phase = arctan2(results[i].im, results[i].re) | |
stdout.write "Freq=" & align($frequencies[i], 7, ' ') & " " | |
stdout.write "Mag=" & align($magnitude, 12, ' ') & " " | |
echo "Phase=" & align($phase, 8, ' ') | |
# Call this function to run the original demos and generalized Goertzel demos | |
proc runGoertzelDemos() = | |
# Generalized Goertzel demos | |
echo "\n=== Generalized Goertzel Demos ===\n" | |
demoGeneralizedGoertzel() | |
frequencySweepDemo() | |
proc main() = | |
runGoertzelDemos() | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# goertzelgeneralhoppinfilterbankflex.nim | |
## A Generalized Hoppin' Goertzel Filter Bank with per filter settable | |
## frequency and fft length. | |
## | |
## As results with different bin sizes don't match well, the data from filters | |
## with a wider bin / lower fft size / bigger bandwith, are redistributed to | |
## narrower bins. This is done using the sinc function and phase compensation. | |
## | |
import std/[complex, math, sequtils, strformat] | |
const | |
Samplerate* {.intdefine.} = 44100 | |
SRate* = Samplerate.float | |
type | |
GoertzelValueError* = object of ValueError | |
GHGFilterBank* = object | |
frequency*: seq[float] | |
fftSize*: seq[int] | |
gain*: seq[float] | |
normFreq*: seq[float] | |
maxFFTsize*: int | |
targetFreqs*: seq[float] | |
omega: seq[float] # 2*pi*frequencyIndex/N | |
coeff: seq[float] # 2*cos(omega) | |
sine: seq[float] # sin(omega) | |
cosine: seq[float] # cos(omega) | |
complexConst: seq[Complex64] # exp(-i*omega) | |
s0: seq[float] # State variables | |
s1: seq[float] | |
s2: seq[float] | |
sampleCount: seq[int] # Number of processed samples | |
proc nearestPow2(val: float):int= | |
pow(2.0, ceil(log10(val) / log10(2.0))).int | |
proc targetFrequencies*(startFreq, endFreq: float, numBins: int): seq[float] = | |
var tF = newSeq[float](numBins) | |
let logStart = ln(startFreq) | |
let logEnd = ln(endFreq) | |
let logStep = (logEnd - logStart) / (numBins - 1).float | |
for i in 0..<numBins: | |
tF[i] = exp(logStart + i.float * logStep) | |
return tF | |
proc initGHGFilterBank*( | |
frequency: seq[float], fftSize: seq[int] | |
): GHGFilterBank = | |
var ghgfb = GHGFilterBank( | |
frequency: frequency, | |
fftSize: fftSize, | |
gain: newSeq[float](frequency.len), | |
normFreq: newSeq[float](frequency.len), | |
targetFreqs: newSeq[float](frequency.len), | |
omega: newSeq[float](frequency.len), | |
coeff: newSeq[float](frequency.len), | |
sine: newSeq[float](frequency.len), | |
cosine: newSeq[float](frequency.len), | |
complexConst: newSeq[Complex64](frequency.len), | |
s0: newSeq[float](frequency.len), | |
s1: newSeq[float](frequency.len), | |
s2: newSeq[float](frequency.len), | |
sampleCount: newSeq[int](frequency.len) | |
) | |
(_, ghgfb.maxFFTsize) = minmax(fftSize) | |
var targetSize = ghgfb.maxFFTsize | |
if not ghgfb.maxFFTsize.int.isPowerOfTwo: | |
targetSize = nearestPow2(targetSize.float) | |
let (fMin, fMax) = minmax(ghgfb.frequency) | |
ghgfb.targetFreqs = targetFrequencies(fmin, fmax, targetSize) | |
# Initialize the first filter directly | |
let | |
freqIdx = frequency[0] * fftSize[0].float / SRate | |
omega = TAU * freqIdx / fftSize[0].float | |
# First filter | |
ghgfb.normFreq[0] = frequency[0] / (SRate / 2.0) | |
ghgfb.omega[0] = omega | |
ghgfb.sine[0] = sin(omega) | |
ghgfb.cosine[0] = cos(omega) | |
ghgfb.coeff[0] = 2.0 * ghgfb.cosine[0] | |
ghgfb.complexConst[0] = complex64(cos(-omega), sin(-omega)) | |
# "Hoppin' technique" for all following filters | |
for i in 1..<frequency.len: | |
# Normalized frequency bin index for previous and current filter | |
let | |
prevFreqIdx = frequency[i-1] * fftSize[i-1].float / SRate | |
currFreqIdx = frequency[i] * fftSize[i].float / SRate | |
deltaFreqIdx = currFreqIdx - prevFreqIdx | |
deltaOmega = TAU * deltaFreqIdx / fftSize[i].float | |
# Sine and cosine offset, using the "hopping method" | |
ghgfb.cosine[i] = ghgfb.cosine[i-1] * cos(deltaOmega) - ghgfb.sine[i-1] * sin(deltaOmega) | |
ghgfb.sine[i] = ghgfb.sine[i-1] * cos(deltaOmega) + ghgfb.cosine[i-1] * sin(deltaOmega) | |
# The rest of the bunch | |
ghgfb.normFreq[i] = frequency[i] / (SRate / 2.0) | |
ghgfb.omega[i] = TAU * currFreqIdx / fftSize[i].float | |
ghgfb.coeff[i] = 2.0 * ghgfb.cosine[i] | |
ghgfb.complexConst[i] = complex64(cos(-ghgfb.omega[i]), sin(-ghgfb.omega[i])) | |
return ghgfb | |
proc reset*(ghgfb: var GHGFilterBank) {.inline.} = | |
## Reset all filter states to zero befor processing a new block | |
for i in 0..<ghgfb.frequency.len: | |
ghgfb.s0[i] = 0.0 | |
ghgfb.s1[i] = 0.0 | |
ghgfb.s2[i] = 0.0 | |
ghgfb.sampleCount[i] = 0 | |
ghgfb.gain[i] = 0.0 | |
proc updateFilterGain(ghgfb: var GHGFilterBank, gains:seq[float]) {.inline.} = | |
ghgfb.gain = gains | |
proc calculateFilterBandwidth*(fftSize: int, sampleRate: float): float = | |
## Calculate the effective bandwidth of a Goertzel filter | |
## Based on the frequency resolution of the FFT size | |
return sampleRate / fftSize.float | |
proc redistributeComplexToOutput*( | |
outputComplex: var seq[Complex64], | |
filterComplex: Complex64, | |
filterCenterFreq: float, | |
filterFFTSize: int, | |
sampleRate: float, | |
targetFreqs: seq[float] | |
) = | |
## Different "strategies" based on filter bandwidth | |
let filterBandwidth = calculateFilterBandwidth(filterFFTSize, sampleRate) | |
# For very wide filters (low frequencies), use broader distribution | |
# For narrow filters (high frequencies), use tighter distribution | |
# Change to need or taste | |
let adaptiveSigma = if filterBandwidth > 100.0: | |
filterBandwidth / 3.0 | |
else: | |
filterBandwidth / 5.0 | |
let sigmaSq2 = 2.0 * adaptiveSigma * adaptiveSigma | |
let freqRange = 3.0 * adaptiveSigma | |
# Find affected range | |
var totalWeight = 0.0 | |
var weightedSum = complex64(0.0, 0.0) | |
for i in 0..<targetFreqs.len: | |
let freqDiff = abs(targetFreqs[i] - filterCenterFreq) | |
if freqDiff <= freqRange: | |
let freqDiffSigned = targetFreqs[i] - filterCenterFreq | |
let weight = exp(-(freqDiffSigned * freqDiffSigned) / sigmaSq2) | |
totalWeight += weight | |
weightedSum += complex64(weight, 0.0) | |
# Normalize and distribute | |
if totalWeight > 0.0: | |
for i in 0..<targetFreqs.len: | |
let freqDiff = abs(targetFreqs[i] - filterCenterFreq) | |
if freqDiff <= freqRange: | |
let freqDiffSigned = targetFreqs[i] - filterCenterFreq | |
let weight = exp(-(freqDiffSigned * freqDiffSigned) / sigmaSq2) | |
let normalizedWeight = weight / totalWeight | |
outputComplex[i] += filterComplex * normalizedWeight | |
proc processGFB*( | |
ghgfb: var GHGFilterBank, gain:seq[float], samples:seq[float] | |
): seq[Complex64] = | |
if samples.len < ghgfb.maxFFTSize or gain.len != ghgfb.frequency.len: | |
raise newException( | |
GoertzelValueError, | |
"The seq's maxFFTSize, gain and samples have to be of the same length" | |
) | |
ghgfb.reset() | |
ghgfb.updateFilterGain(gain) | |
var filterOutput = newSeq[Complex64](ghgfb.targetFreqs.len) | |
for s in 0..<ghgfb.maxFFTsize: | |
let sample = samples[s] | |
for i in 0..<ghgfb.frequency.len: | |
if ghgfb.sampleCount[i] == -1: | |
continue | |
# Process the sample through the filter | |
ghgfb.s0[i] = sample + ghgfb.coeff[i] * ghgfb.s1[i] - ghgfb.s2[i] | |
ghgfb.s2[i] = ghgfb.s1[i] | |
ghgfb.s1[i] = ghgfb.s0[i] | |
inc ghgfb.sampleCount[i] | |
# If this filter has completed its analysis, | |
# redistribute complex result over the ouput bins | |
if ghgfb.sampleCount[i] >= ghgfb.fftSize[i]: | |
let real = ghgfb.s1[i] - ghgfb.s2[i] * ghgfb.cosine[i] | |
let imag = ghgfb.s2[i] * ghgfb.sine[i] | |
redistributeComplexToOutput( | |
filterOutput, | |
complex64(real, imag), | |
ghgfb.frequency[i], | |
ghgfb.fftSize[i], | |
SRate, | |
ghgfb.targetFreqs | |
) | |
ghgfb.sampleCount[i] = -1 #filter i completed | |
return filterOutput | |
proc frequenciesOct*(baseFreq: float, fPerOctave: seq[int]): seq[float] = | |
## Generate frequency bins with varying resolution per octave | |
## | |
## Parameters: | |
## baseFreq: Starting frequency in Hz | |
## fPerOctave: Sequence specifying how many filters to use in each octave | |
## | |
## Returns: | |
## Sequence of logarithmically spaced frequencies | |
var freqs = @[baseFreq] # Start with the base frequency | |
var currentFreq = baseFreq | |
for octaveIdx, count in fPerOctave: | |
let octaveStart = currentFreq | |
let octaveEnd = octaveStart * 2.0 | |
if count > 0: | |
# Step size for logarithmic distribution within this octave | |
let step = pow(2.0, 1.0 / count.float) | |
# Add frequencies for this octave, excluding the start (already added) | |
for i in 1..count: | |
currentFreq = octaveStart * pow(step, i.float) | |
freqs.add(currentFreq) | |
else: | |
# If count is 0 | |
currentFreq = octaveEnd | |
freqs.add(currentFreq) | |
return freqs | |
proc goertzelBlockSize*( | |
frequencies: seq[float], sRate: float, | |
minBlockSize: int = 32, maxBlockSize: int = 8192 | |
): seq[int] = | |
## Find optimal block sizes for a bank of Goertzel filters | |
## | |
## Parameters: | |
## frequencies: Sequence of target frequencies for Goertzel filters | |
## sRate: Sample rate in Hz | |
## minBlockSize: Minimum allowed block size | |
## maxBlockSize: Maximum allowed block size | |
## | |
## Returns: | |
## Sequence of block sizes optimized for each Goertzel filter | |
var blockSizes = newSeq[int](frequencies.len) | |
for i in 0..(frequencies.len - 2): | |
# For Goertzel, we want N * f / sRate to be an integer for accurate | |
# frequency targeting | |
# Frequency gap to next band | |
let deltaF = frequencies[i+1] - frequencies[i] | |
var blockSize = (sRate / deltaF).ceil.int | |
# Adjust block size for better frequency alignment, find the closest block | |
# size where k = N * f / sRate is nearly integer | |
let targetK = frequencies[i] * blockSize.float / sRate | |
# If far from an int k value, adjust block size | |
if abs(targetK - targetK.round) > 0.1: | |
let adjustedSize = (sRate / frequencies[i] * targetK.round).ceil.int | |
if abs(adjustedSize - blockSize).float < blockSize.float * 0.1: | |
blockSize = adjustedSize | |
blockSize = max(minBlockSize, min(blockSize, maxBlockSize)) | |
blockSizes[i] = blockSize | |
# Set the last filter's block size | |
if frequencies.len > 1: | |
blockSizes[^1] = blockSizes[^2] | |
else: | |
# If there's only one frequency, estimate | |
blockSizes[0] = min( | |
maxBlockSize, max(minBlockSize, | |
(sRate / (frequencies[0] * 0.1)).ceil.int) | |
) | |
return blockSizes | |
proc getPhaseVector*(complexVector: seq[Complex64]): seq[float] = | |
## Extract phase information (in radians) from a complex vector | |
result = newSeq[float](complexVector.len) | |
for i in 0..<complexVector.len: | |
result[i] = arctan2(complexVector[i].im, complexVector[i].re) | |
proc getMagnitudeVector*(complexVector: seq[Complex64]): seq[float] = | |
## Extract magnitude information from a complex vector | |
result = newSeq[float](complexVector.len) | |
for i in 0..<complexVector.len: | |
result[i] = sqrt(complexVector[i].re * complexVector[i].re + | |
complexVector[i].im * complexVector[i].im) | |
proc getPower*(complexVector: seq[Complex64]): seq[float] = | |
result = newSeq[float](complexVector.len) | |
for i in 0..<complexVector.len: | |
result[i] = complexVector[i].re * complexVector[i].re + | |
complexVector[i].im * complexVector[i].im | |
when isMainModule: | |
proc generateTestSignal(frequencies: seq[float], amplitudes: seq[float], blockSize: int): seq[float] = | |
result = newSeq[float](blockSize) | |
for i in 0..<blockSize: | |
var sample = 0.0 | |
for j in 0..<frequencies.len: | |
let step = frequencies[j] * ((2.0 * PI) / SRate) | |
sample += amplitudes[j] * sin(float(i) * step) | |
result[i] = sample | |
proc bandPass*(frequency: seq[float], centerFreq: float, q: float = 1.0, order: float = 2.0): seq[float] = | |
var gain = newSeq[float](frequency.len) | |
let bw = centerFreq / q | |
for i in 0..<frequency.len: | |
let normalizedDist = abs(frequency[i] - centerFreq) / (bw / 2.0) | |
let g = 1.0 / (1.0 + pow(normalizedDist, 2.0 * order)) | |
gain[i] = g | |
return gain | |
let frequency = frequenciesOct(31.5, @[1,3,5,10,15,20,40,40,10]) | |
let fftlen = goertzelBlockSize(frequency, SRate) | |
var (_, fftmax) = minmax(fftlen) | |
echo "Test Signal Composition:" | |
let | |
testFreqs = @[50.0, 500.0, 1000.0, 2000.0, 6000.0] | |
testAmps = @[ 1.0, 50.0, 25.0, 75.0, 85.0] | |
testSignal = generateTestSignal(testFreqs, testAmps, fftmax) | |
for i in 0..<testFreqs.len: | |
echo fmt"Component {i+1}: {testFreqs[i]} Hz, Amplitude: {testAmps[i]}" | |
echo "\n" | |
var ghf = initGHGFilterBank(frequency, fftlen) | |
let gain = newSeqWith(ghf.frequency.len, 1.0) | |
#let gain = bandpass(frequency, 1500.0, 1.0, 2.0) | |
#echo gain | |
let x = ghf.processGFB(gain, testSignal) | |
let mag = getMagnitudeVector(x) | |
let (_,maxMag) = minmax(mag) | |
#let ph = getPhaseVector(x) | |
for i in 0..<mag.len: | |
if i mod 50 == 0: | |
echo ghf.targetFreqs[i]," Hz magnitude: ", mag[i] / maxMag | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment