|
# preview of private mwax_mover code that does the fitting |
|
# https://github.com/MWATelescope/mwax_mover/blob/43a25530e10f680f455ffa8152103a38979b1f3b/src/mwax_mover/mwax_calvin_utils.py#L755 |
|
def fit_phase_line( |
|
freqs_hz: NDArray[np.float64], |
|
solution: NDArray[np.complex128], |
|
weights: NDArray[np.float64], |
|
niter: int = 1, |
|
fit_iono: bool = False, |
|
# chanblocks_per_coarse: int, |
|
# bin_size: int = 10, |
|
# typical_thickness: float = 3.9, |
|
) -> PhaseFitInfo: |
|
""" |
|
Linear fit phases |
|
- freqs: array of frequencies in Hz |
|
- solution: complex array of solutions |
|
- niter: number of iterations to perform |
|
|
|
Credit: Dr. Sammy McSweeny, Marcin Sokolowski |
|
https://arxiv.org/pdf/2005.02041 |
|
""" |
|
|
|
# original number of frequencies |
|
nfreqs = len(freqs_hz) |
|
|
|
# sort by frequency |
|
ind = np.argsort(freqs_hz) |
|
freqs_hz = freqs_hz[ind] |
|
solution = solution[ind] |
|
weights = weights[ind] |
|
|
|
# Choose a suitable frequency bin width: |
|
# - Assume the frequencies are "quantised" (i.e. all integer multiples of some constant) |
|
# - Assume there is at least one example of a pair of consecutive bins present |
|
# - Do not assume the arrays are ordered in increasing frequency |
|
|
|
# Get the minimum difference between two (now-ordered) consecutive bins, and |
|
# declare this to be the bin width |
|
dν = np.min(np.diff(freqs_hz)) * u.Hz |
|
|
|
# remove nans and zero weights |
|
mask = np.where(np.logical_and(np.isfinite(solution), weights > 0))[0] |
|
if len(mask) < 2: |
|
raise RuntimeError(f"Not enough valid phases to fit ({len(mask)})") |
|
solution = solution[mask] |
|
freqs_hz = freqs_hz[mask] |
|
weights = weights[mask] |
|
|
|
# normalise |
|
solution /= np.abs(solution) |
|
solution *= weights |
|
# print(f"{np.angle(solution)[:4]=}, ") |
|
|
|
# Now we want to "adjust" the solution data so that it |
|
# - is roughly centered on the DC bin |
|
# - has a large amount of zero padding on either side |
|
ν = freqs_hz * u.Hz # type: ignore |
|
bins = np.round((ν / dν).decompose().value).astype(int) |
|
ctr_bin = (np.min(bins) + np.max(bins)) // 2 |
|
shifted_bins = bins - ctr_bin # Now "bins" represents where I want to put the solution values |
|
|
|
# ...except that ~1/2 of them are negative, so I'll have to add a certain amount |
|
# once I decide how much zero padding to include. |
|
# This is set by the resolution I want in delay space (Nyquist rate) |
|
# type: ignore |
|
dm = 0.01 * u.m # type: ignore |
|
dt = dm / c # The target time resolution |
|
νmax = 0.5 / dt # The Nyquist rate |
|
N = 2 * int(np.round(νmax / dν)) # The number of bins to use during the FFTs |
|
|
|
shifted_bins[ |
|
shifted_bins < 0 |
|
] += N # Now the "negative" frequencies are put at the end, which is where FFT wants them |
|
|
|
# Create a zero-padded, shifted version of the spectrum, which I'll call sol0 |
|
# sol0: This shifts the non-zero data down to a set of frequencies straddling the DC bin. |
|
# This makes the peak in delay space broad, and lets us hone in near the optimal solution by |
|
# finding the peak in delay space |
|
sol0 = np.zeros((N,)).astype(complex) |
|
sol0[shifted_bins] = solution |
|
|
|
# IFFT of sol0 to get the approximate solution as the peak in delay space |
|
isol0 = np.fft.ifft(sol0) |
|
t = -np.fft.fftfreq(len(sol0), d=dν.to(u.Hz).value) * u.s # (Not sure why this negative is needed) |
|
d = np.fft.fftshift(c * t) |
|
isol0 = np.fft.fftshift(isol0) |
|
|
|
# Find max peak, and the equivalent slope |
|
imax = np.argmax(np.abs(isol0)) |
|
dmax = d[imax] |
|
# print(f"{dmax=:.02f}") |
|
slope = (2 * np.pi * u.rad * dmax / c).to(u.rad / u.Hz) |
|
# print(f"{slope=:.10f}") |
|
|
|
# Now that we're near a local minimum, get a better one by doing a standard minimisation |
|
# To get the y-intercept, divide the original data by the constructed data |
|
# and find the average phase of the result |
|
# if fit_iono: |
|
# model = lambda ν, m, c, α: np.exp(1j * (m * ν + c + α / ν**2)) |
|
# y_int = np.angle(np.mean(solution / model(ν.to(u.Hz).value, slope.value, 0, 0))) |
|
# params = (slope.value, y_int, 0) |
|
|
|
def model(ν, m, c): |
|
return np.exp(1j * (m * ν + c)) |
|
|
|
y_int = np.angle(np.mean(solution / model(ν.to(u.Hz).value, slope.value, 0))) |
|
params = (slope.value, y_int) |
|
|
|
def objective(params, ν, data): |
|
constructed = model(ν, *params) |
|
residuals = wrap_angle(np.angle(data) - np.angle(constructed)) |
|
cost = np.sum(np.abs(residuals) ** 2) |
|
return cost |
|
|
|
resid_std, chi2dof, stderr = None, None, None |
|
# while len(mask) >= 2 and (niter:= niter - 1) <= 0: |
|
while True: |
|
res = minimize(objective, params, args=(ν.to(u.Hz).value, solution)) |
|
params = res.x |
|
constructed = model(ν.to(u.Hz).value, *params) |
|
residuals = wrap_angle(np.angle(solution) - np.angle(constructed)) |
|
chi2dof = np.sum(np.abs(residuals) ** 2) / (len(residuals) - len(params)) |
|
resid_std = residuals.std() |
|
resid_var = residuals.var(ddof=len(params)) |
|
stderr = np.sqrt(np.diag(res.hess_inv * resid_var)) |
|
mask = np.where(np.abs(residuals) < 2 * stderr[0])[0] |
|
solution = solution[mask] |
|
ν = ν[mask] |
|
# TODO: iterations? |
|
# niter = niter-1 |
|
# if len(mask) < 2 or niter <= 0: |
|
# break |
|
break |
|
|
|
period = ((params[0] * u.rad / u.Hz) / (2 * np.pi * u.rad)).to(u.s) |
|
quality = len(mask) / nfreqs |
|
|
|
return PhaseFitInfo( |
|
length=(c * period).to(u.m).value, |
|
intercept=wrap_angle(params[1]), |
|
sigma_resid=resid_std, |
|
chi2dof=chi2dof, |
|
quality=quality, |
|
stderr=stderr[0], |
|
# median_thickness=median_thickness, |
|
) |