Skip to content

Instantly share code, notes, and snippets.

@tam17aki
Last active June 2, 2025 07:21
Show Gist options
  • Select an option

  • Save tam17aki/81a447e1e9559430255e90dd7a54446f to your computer and use it in GitHub Desktop.

Select an option

Save tam17aki/81a447e1e9559430255e90dd7a54446f to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
"""A demonstration of Fundamental Frequency Estimation Based on FOHCDE.
Copyright (C) 2025 by Akira TAMAMORI
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import math
import numpy as np
import numpy.polynomial.polynomial as poly
import numpy.typing as npt
from scipy.signal import butter, convolve, filtfilt
def apply_bandpass_filter(
signal: npt.NDArray[np.float64],
fs: float,
lowcut: int,
highcut: int,
order: int = 4,
) -> npt.NDArray[np.float64]:
"""Apply a Butterworth bandpass filter to the signal.
Args:
signal (np.ndarray): Input discrete-time signal.
fs (float): Sampling frequency in Hz.
lowcut (int): Lower cutoff frequency.
highcut (int): Higher cutoff frequency.
order (int): Order of bandpass filter coeff.
Returns:
filtered_signal (np.ndarray): Bandpass-filtered signal.
"""
nyquist = 0.5 * fs
low = lowcut / nyquist
high = highcut / nyquist
filter_coef = butter(order, [low, high], btype="band")
assert (
isinstance(filter_coef, tuple)
and len(filter_coef) == 2
and isinstance(filter_coef[0], np.ndarray)
and isinstance(filter_coef[1], np.ndarray)
)
filtered_signal: npt.NDArray[np.float64] = filtfilt(
filter_coef[0], filter_coef[1], signal
)
return filtered_signal
def get_kth_even_derivative(
s: npt.NDArray[np.float64], k_order: int, dt: float
) -> tuple[npt.NDArray[np.float64], int]:
"""Calculates the k-th order even derivative using centered differences.
Args:
s (np.ndarray): Input signal.
k_order (int): The order of the derivative (must be an even number).
dt (float): Sampling period.
Returns:
tuple: (derived_signal, pad_length_per_side)
derived_signal: The calculated derivative.
pad_length_per_side: Number of points trimmed from each
side of the original signal.
"""
if k_order % 2 != 0:
raise ValueError("Only even order derivatives are supported.")
if k_order < 0:
raise ValueError("Derivative order cannot be negative.")
kernel = np.array([1.0])
for _ in range(k_order // 2):
kernel = np.convolve(kernel, np.array([1.0, -2.0, 1.0]), mode="full")
derived_s = convolve(s, kernel, mode="valid") / (dt**k_order)
return derived_s, k_order // 2
def calc_covmat_mn(
m: int,
n: int,
signal_original: npt.NDArray[np.float64],
dt: float,
max_derivative_order: int,
) -> np.float64 | float:
"""Calculates the sum of products of (2m)-th and (2n)-th derivatives for FOHCDE.
All derivatives are trimmed to the length of the highest required derivative.
The dt scaling is applied *after* the sum.
Args:
m (int): The order of the derivative.
n (int): The order of the derivative
signal_original (np.darray): Original signal.
dt (float): Sampling period.
max_derivative_order (int): Maximum of the order of derivative
Returns:
cov_mat (float): The (m, n) element of covariance matrix.
"""
# Check if the signal is long enough for the highest derivative
if len(signal_original) < max_derivative_order + 1:
return 0.0
# Calculate all required derivatives (without dt scaling)
deriv_x_m_full, _ = get_kth_even_derivative(signal_original, 2 * m, dt)
deriv_x_n_full, _ = get_kth_even_derivative(signal_original, 2 * n, dt)
# Determine the effective length after applying the highest order derivative
target_len = len(signal_original) - max_derivative_order # This is 2*M
if target_len <= 0:
return 0.0
# Calculate padding for each derivative to align to the target_len
# For a 2k-th derivative, its length is L - 2k.
# We want to trim it to L - 2M.
# So, we trim ( (L - 2k) - (L - 2M) ) / 2 = (2M - 2k) / 2 = M - k points
# from each side.
# M is max_derivative_order // 2. k is m or n.
pad_m_to_align = (max_derivative_order // 2) - m
pad_n_to_align = (max_derivative_order // 2) - n
trimmed_deriv_m = deriv_x_m_full[
pad_m_to_align : len(deriv_x_m_full) - pad_m_to_align
]
trimmed_deriv_n = deriv_x_n_full[
pad_n_to_align : len(deriv_x_n_full) - pad_n_to_align
]
if len(trimmed_deriv_m) != target_len or len(trimmed_deriv_n) != target_len:
# This indicates a logic error or unexpected behavior.
# It's crucial for numerical stability that these lengths match exactly.
print(
f"Debug: Trimmed lengths mismatch target! m={m}, n={n}, "
+ f"len_m={len(trimmed_deriv_m)}, len_n={len(trimmed_deriv_n)}, "
+ f"target={target_len}"
)
return 0.0
covmat_mn: np.float64 = np.sum(trimmed_deriv_m * trimmed_deriv_n)
return covmat_mn
def calc_covmat(
signal_original: npt.NDArray[np.float64], dt: float, max_harmonic_order: int
) -> npt.NDArray[np.float64]:
"""Calculate the covariance matrix.
Args:
signal_original (np.darray): Original signal.
dt (float): Sampling period.
max_harmonic_order (int): Maximum order of harmonics.
Returns:
covmat (np.darray): Polynomial coefficients a_k.
"""
max_derivative_order = 2 * max_harmonic_order
covmat = np.zeros(
(max_harmonic_order + 1, max_harmonic_order + 1), dtype=np.float64
)
for m in range(max_harmonic_order + 1):
for n in range(max_harmonic_order + 1):
covmat[m, n] = calc_covmat_mn(
m, n, signal_original, dt, max_derivative_order
)
# Ensure S_matrix is invertible (check rank)
if (
np.linalg.matrix_rank(covmat, tol=1e-12) < covmat.shape[0]
): # tolerance for rank check
print("Warning: S matrix is singular or near-singular. Cannot proceed.")
return np.array([])
return covmat
def calc_alpha_coeff(max_harmonic_order: int) -> npt.NDArray[np.float64]:
"""Calculate the polynomial coefficients a_k from product_{m=1}^{M} (1 + m^2 p).
Args:
max_harmonic_order (int): Maximum order of harmonics.
Returns:
alpha_coeffs (np.darray): Polynomial coefficients a_k.
"""
# The roots of (1 + m^2 p) are -1/(m^2).
roots_for_poly = np.array(
[-1.0 / (m_val**2) for m_val in range(1, max_harmonic_order + 1)],
dtype=np.float64,
)
# Coefficients of Q(p) = (1/1^2 * ... * 1/M^2) + ... + p^M
# Note: ascending order
poly_q_coeffs = poly.polyfromroots(roots_for_poly)
# Calculate the leading coefficient (product_{m=1}^{M} m^2)
leading_factor: int = math.prod(
[m_val**2 for m_val in range(1, max_harmonic_order + 1)]
)
# alpha_coeffs should be coefficients of P(p) = sum_{k=0}^{M} a_k p^k.
# poly.polyfromroots gives [c_0, c_1, ..., c_M].
# So it's already in ascending order of power.
# alpha_coeffs[k] is coefficient of p^k (a_k)
alpha_coeffs: npt.NDArray[np.float64] = (leading_factor * poly_q_coeffs).astype(
np.float64
)
return alpha_coeffs
def calc_beta_coeff(
alpha_coeffs: npt.NDArray[np.float64],
cov_mat: npt.NDArray[np.float64],
max_harmonic_order: int,
) -> npt.NDArray[np.float64]:
"""Calculate the polynomial coefficients beta_k from alpha_k.
Args:
alpha_coeffs (np.darray): The coefficients of polynomial for Eq. (14).
cov_mat (np.darray): The covariance matrix for Eq. (8).
max_harmonic_order (int): Maximum order of harmonics.
Returns:
beta_coeffs (np.darray): Polynomial coefficients beta_k.
"""
beta_coeffs = np.zeros(2 * max_harmonic_order + 1) # beta_0 to beta_2M
for i in range(max_harmonic_order + 1):
for j in range(max_harmonic_order + 1):
beta_coeffs[i + j] += (
alpha_coeffs[i]
* alpha_coeffs[j]
* cov_mat[max_harmonic_order - i, max_harmonic_order - j]
)
return beta_coeffs
def calc_poly_coeff(
beta_coeffs: npt.NDArray[np.float64], max_harmonic_order: int
) -> npt.NDArray[np.float64]:
"""Solve polynomial equation for the fundamental frequency.
Args:
beta_coeffs (np.darray): The coefficients of polynomial for Eq. (15).
max_harmonic_order (int): Maximum order of harmonics.
Returns:
poly_coeffs (np.darray): Estimated fundamental frequency.
"""
temp_coeffs_ascending_power = np.zeros(2 * max_harmonic_order)
for k_power in range(
1, 2 * max_harmonic_order + 1
): # k_power is the power in J(Omega)
# Coefficient for Omega^(k_power - 1) in dJ/dOmega
temp_coeffs_ascending_power[k_power - 1] = k_power * beta_coeffs[k_power]
poly_coeffs = temp_coeffs_ascending_power
return poly_coeffs
def solve_poly_getf0(
poly_coeffs: npt.NDArray[np.float64], fs: float
) -> npt.NDArray[np.float64]:
"""Solve polynomial equation for the fundamental frequency.
Args:
poly_coeffs (np.darray): The coefficients of polynomial for Eq. (18).
fs (float): Sampling frequency.
Returns:
f0 (np.darray): Estimated fundamental frequency.
"""
omegas_squared_candidates = poly.polyroots(poly_coeffs)
estimated_f0_hz = []
for omega_sq in omegas_squared_candidates:
# Candidate should be real and positive (and within reasonable tolerance)
if np.isreal(omega_sq) and omega_sq.real > 0.0:
omega_1 = np.sqrt(omega_sq.real)
f0_hz = omega_1 / (2 * np.pi)
estimated_f0_hz.append(f0_hz)
# Return the lowest valid positive real root within a plausible range
if estimated_f0_hz:
plausible_f0s = [
f for f in estimated_f0_hz if 20 <= f <= fs / 2
] # Max freq is Nyquist limit
if plausible_f0s:
# Often, the fundamental is the lowest frequency.
# But the polynomial can have multiple roots.
# In a real scenario, you might pick the one with the largest power or
# the one closest to a previous estimate.
# For this demo, let's return the most plausible one if possible.
# If multiple plausible, return the lowest one.
f0: npt.NDArray[np.float64] = np.array([min(plausible_f0s)])
return f0
return np.array([]) # No plausible F0 found
return np.array([])
# --- Core FOHCDE (Section 3.3.2) implementation ---
def estimate_fundamental_frequency(
signal: npt.NDArray[np.float64], fs: float, max_harmonic_order: int
) -> npt.NDArray[np.float64]:
"""Estimates the fundamental frequency of a single source using FOHCDE.
This script implements the method described in Section 3.3.2 of:
Kenta Yamada, Yoshiki Masuyama, Kouei Yamaoka, Natsuki Ueno, and Nobutaka Ono,
"Multiple Pitch Estimation Based on Finite-Order Harmonic Constraint Differential
Equation," IEICE Technical Report EA2023-115, SIP2023-162, SP2023-97(2024-03).
https://www.ieice.org/ken/paper/202403155G5Z/
This method assumes a single fundamental frequency source with specified harmonic
orders.
Args:
signal (np.ndarray): Input discrete-time signal.
fs (float): Sampling frequency in Hz.
max_harmonic_order (int): Maximum order of harmonics.
Returns:
estimated fundamental frequency (float).
"""
if max_harmonic_order <= 0:
return np.array([])
dt = 1.0 / fs
# The highest derivative order needed for S_i,j is 2*max_harmonic_order (2M).
max_deriv_order_needed = 2 * max_harmonic_order
# Check if the signal is long enough for the highest derivative
if len(signal) < max_deriv_order_needed + 1:
print(
f"Error: Signal length {len(signal)} is too short for a "
+ f"{max_deriv_order_needed}-th order derivative."
)
return np.array([])
# 1. Calculate the covariance matrix (Eq. 17)
covmat = calc_covmat(signal, dt, max_harmonic_order)
# 2. Calculate the polynomial coefficients a_k from product_{m=1}^{M} (1 + m^2 p)
# (Eq. 14)
alpha_coeffs = calc_alpha_coeff(max_harmonic_order)
# 3. Calculate beta_k coefficients for the J(Omega) polynomial (Eq. 15)
# J(Omega) = sum_{k=0}^{2M} beta_k Omega^k
# beta_k = sum_{i=0 to M} sum_{j=0 to M} { a_i a_j S_i,j if i+j == k else 0 }
beta_coeffs = calc_beta_coeff(alpha_coeffs, covmat, max_harmonic_order)
# 4. Form the polynomial for dJ/dOmega = 0 (Eq. 18)
# dJ/dOmega = sum_{k=1}^{2M} (k * beta_k * Omega^(k-1))
# This is a polynomial of degree (2M-1).
# Its coefficients in descending order of power (for np.roots) are:
# [ (2M)*beta_2M, (2M-1)*beta_{2M-1}, ..., 1*beta_1 ]
poly_coeffs = calc_poly_coeff(beta_coeffs, max_harmonic_order)
# 5. Solve the polynomial equation for Omega (omega_1^2)
f0 = solve_poly_getf0(poly_coeffs, fs)
return f0
def main() -> None:
"""Perform demonstration for FOHCDE (Single Fundamental Frequency)."""
fs = 44100 # Sampling frequency (Hz)
duration = 0.04 # Signal duration (seconds) - 40ms
t = np.linspace(0, duration, int(fs * duration), endpoint=False)
# True fundamental frequency and max harmonic order (M)
f_true_f0 = 200.0 # Hz
max_harmonic_order = 3 # M=3: Signal contains f0, 2*f0, 3*f0
# Generate the test signal (f0 and its harmonics)
amplitude = 1.0
signal = {}
signal["pure"] = np.zeros_like(t, dtype=np.float64)
for m in range(1, max_harmonic_order + 1):
signal["pure"] += amplitude * np.cos(2 * np.pi * (m * f_true_f0) * t)
print(
f"--- FOHCDE Single Source Demo: f0={f_true_f0} Hz "
+ f"with M={max_harmonic_order} harmonics ---"
)
print(f"True fundamental frequency: {f_true_f0} Hz")
print(
f"Signal contains harmonics up to {max_harmonic_order}x "
+ f"f0 ({f_true_f0 * max_harmonic_order} Hz)"
)
# --- Test without noise ---
print(f"\n--- Test without noise ({duration*1000:.0f}ms signal) ---")
estimated_f0s = estimate_fundamental_frequency(
signal["pure"], fs, max_harmonic_order
)
print(f"Estimated fundamental frequency: {estimated_f0s[0]:.4f} Hz")
if len(estimated_f0s) == 1 and estimated_f0s[0] != 0.0:
print(f"Estimation error: {np.abs(estimated_f0s[0] - f_true_f0):.4f} Hz")
else:
print("Warning: Did not estimate exactly one fundamental frequency.")
# --- Test with band-limited noise ---
snr_db = 10
print(
f"\n--- Test with band-limited noise ({duration*1000:.0f}ms signal, "
+ f"SNR {snr_db}dB) ---"
)
signal_power = np.var(signal["pure"])
noise_power = signal_power / (10 ** (snr_db / 10))
noise = np.random.normal(0, np.sqrt(noise_power), len(signal["pure"]))
signal["noisy"] = signal["pure"] + apply_bandpass_filter(
noise,
fs,
lowcut=int(f_true_f0 * 0.9),
highcut=int(f_true_f0 * max_harmonic_order * 1.1),
)
estimated_f0s = estimate_fundamental_frequency(
signal["noisy"], fs, max_harmonic_order
)
print(f"Estimated fundamental frequency with noise: {estimated_f0s[0]:.4f} Hz")
if len(estimated_f0s) == 1:
print(f"Estimation error: {np.abs(estimated_f0s[0] - f_true_f0):.4f} Hz")
else:
print("Warning: Did not estimate exactly one fundamental frequency.")
# --- Test with very short signal duration ---
duration_short = 0.002 # 2ms
print(
f"\n--- Test with very short signal duration ({duration_short*1000:.0f}ms) ---"
)
t_short = np.linspace(0, duration_short, int(fs * duration_short), endpoint=False)
signal["short"] = np.zeros_like(t_short, dtype=np.float64)
for m in range(1, max_harmonic_order + 1):
signal["short"] += amplitude * np.cos(2 * np.pi * (m * f_true_f0) * t_short)
estimated_f0s = estimate_fundamental_frequency(
signal["short"], fs, max_harmonic_order
)
print(
"Estimated fundamental frequency for 2ms signal:"
+ f" {estimated_f0s[0]:.4f} Hz"
)
if len(estimated_f0s) == 1:
print("Estimation error: " + f" {np.abs(estimated_f0s[0] - f_true_f0):.4f} Hz")
else:
print("Warning: Did not estimate exactly one fundamental frequency.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment