Skip to content

Instantly share code, notes, and snippets.

@kgullikson88
Created July 8, 2015 20:45
Show Gist options
  • Save kgullikson88/45466160a1c2b0dfd844 to your computer and use it in GitHub Desktop.
Save kgullikson88/45466160a1c2b0dfd844 to your computer and use it in GitHub Desktop.
import numpy as np
from scipy.interpolate import InterpolatedUnivariateSpline
cimport numpy as np
cimport cython
from libc.math cimport sqrt
from astropy import constants, units
from scipy.signal import fftconvolve
import warnings
DTYPE = np.float64
ctypedef np.float64_t DTYPE_t
def Continuum(x, y, fitorder=3, lowreject=2, highreject=4, numiter=10000, function="poly"):
"""
This function fits the continuum spectrum by iteratively removing
points too far from the mean. The defaults work well for removing telluric lines in
spectra of reasonable S/N ratio.
x/y are assumed to by np arrays holding the spectrum to be fit.
Note: Only the 'poly' function has been tested! Change to spline with extreme caution!
"""
done = False
x2 = np.copy(x)
y2 = np.copy(y)
iteration = 0
while not done and iteration < numiter:
iteration += 1
done = True
if function == "poly":
fit = np.poly1d(np.polyfit(x2 - x2.mean(), y2, fitorder))
elif function == "spline":
fit = smoother(x2, y2, s=fitorder)
residuals = y2 - fit(x2 - x2.mean())
mean = np.mean(residuals)
std = np.std(residuals)
badpoints = np.where(np.logical_or((residuals - mean) < -lowreject*std, residuals - mean > highreject*std))[0]
if badpoints.size > 0 and x2.size - badpoints.size > 5*fitorder:
done = False
x2 = np.delete(x2, badpoints)
y2 = np.delete(y2, badpoints)
return fit(x - x2.mean())
def RebinData(data, xgrid, synphot=True):
"""
This function rebins (x,y) data onto the grid given by the array xgrid
It is designed to rebin to a courser wavelength grid, but can also
interpolate to a finer grid.
if synphot=True, it uses pySynphot for the rebinning, which conserves flux
Otherwise, it just interpolates the data and continuum which is faster
but could cause problems.
"""
if synphot:
#synphot chokes with astropy units, so remove them before proceeding
data, xunits, yunits = data.strip_units()
if isinstance(xgrid, units.Quantity):
xgrid = xgrid.value
#Do the actual rebinning
newdata = DataStructures.xypoint(x=xgrid)
newdata.y = rebin_spec(data.x, data.y, xgrid)
newdata.cont = rebin_spec(data.x, data.cont, xgrid)
newdata.err = rebin_spec(data.x, data.err, xgrid) #Unlikely to be correct!
#pysynphot has edge effect issues on the first and last index.
firstindex = np.argmin(np.abs(data.x - xgrid[0]))
lastindex = np.argmin(np.abs(data.x - xgrid[-1]))
newdata.y[0] = data.y[firstindex]
newdata.y[-1] = data.y[lastindex]
newdata.cont[0] = data.cont[firstindex]
newdata.cont[-1] = data.cont[lastindex]
newdata.err[0] = data.err[firstindex]
newdata.err[-1] = data.err[lastindex]
#Re-apply units
newdata.x *= xunits
newdata.y *= yunits
newdata.err *= yunits
newdata.cont *= yunits
return newdata
else:
data_spacing = data.x[1] - data.x[0]
grid_spacing = xgrid[1] - xgrid[0]
newdata = DataStructures.xypoint(x=xgrid)
if grid_spacing < 2.0*data_spacing:
# Interpolate
Model = spline(data.x, data.y)
Continuum = spline(data.x, data.cont)
newdata.y = Model(newdata.x)
newdata.cont = Continuum(newdata.x)
else:
# Add up the pixels to rebin (actually re-binning).
left = np.searchsorted(data.x, (3*xgrid[0]-xgrid[1])/2.0)
for i in range(xgrid.size-1):
right = np.searchsorted(data.x, (xgrid[i]+xgrid[i+1])/2.0)
newdata.y[i] = np.mean(data.y[left:right])
newdata.cont[i] = np.mean(data.cont[left:right])
left = right
right = np.searchsorted(data.x, (3*xgrid[-1]-xgrid[-2])/2.0)
newdata.y[xgrid.size-1] = np.mean(data.y[left:right])
return newdata
"""
Here is the main, optimized function. It is stolen shamelessly from
the avsini.c implementation given in the SPECTRUM code distribution.
A wrapper called 'Broaden' with my standard calls is below
"""
@cython.boundscheck(False)
@cython.wraparound(False)
cdef np.ndarray[DTYPE_t, ndim=1] convolve(np.ndarray[DTYPE_t, ndim=1] y,
np.ndarray[DTYPE_t, ndim=1] ys,
long num,
long nd,
double st,
double dw,
double vsini,
double u):
cdef double beta, gam, w, dlc, c1, c2, dv, r1, r2, v
cdef long i, n1, n2, n
cdef double clight = 299791.0
cdef DTYPE_t f, t, s
beta = (1.0-u)/(1.0-u/3.0)
gam = u/(1.0-u/3.0)
#End effect
n1 = nd + 1
ys[1:n1] = y[1:n1]
n2 = num - nd - 1
ys[n2:num+1] = y[n2:num+1]
if vsini < 0.5:
return y
#Convolve with rotation profile
w = st + (n1 - 1)*dw
for n in range(n1, n2+1):
w = w+dw
s = 0.0
t = 0.0
dlc = w*vsini/clight
c1 = 0.63661977*beta/dlc;
c2 = 0.5*gam/dlc;
dv = dw/dlc;
for i in range(-nd, nd+1):
v = i*dv
r2 = 1.0 - v**2
if r2 > 0.0:
f = c1*sqrt(r2) + c2*r2
t += f
s += f*y[n+i]
ys[n] = s/t
return ys
"""
This is the wrapper function. The user should call this!
"""
def Broaden_old(model, vsini, epsilon=0.5, linear=False, findcont=False):
"""
model: xypoint instance with the data (or model) to be broadened
vsini: the velocity (times sin(i) ) of the star, in cm/s
epsilon: Linear limb darkening. I(u) = 1-epsilon + epsilon*u
linear: flag for if the x-spacing is already linear. If true, we don't need to linearize
findcont: flag to decide if the continuum needs to be found
"""
if not linear:
x = np.linspace(model.x[0], model.x[-1], model.size())
model = RebinData(model, x)
else:
x = model.x
if findcont:
model.cont = Continuum(model.x, model.y, lowreject=1.5, highreject=10)
#Need to prepare a few more variables before calling 'convolve'
broadened = model.copy()
num = model.size()
ys = np.ones(num)
start = model.x[0]
dwave = model.x[1] - model.x[0]
s2 = (start + num*dwave)*vsini/(dwave*constants.c.cgs.value)
vsini *= units.cm.to(units.km)
nd = s2 + 5.5
broadened.y = convolve(model.y, ys, num, nd, start, dwave, vsini, epsilon)
return broadened
def Broaden(model, vsini, epsilon=0.5, linear=False, findcont=False):
"""
model: xypoint instance with the data (or model) to be broadened
vsini: the velocity (times sin(i) ) of the star, in cm/s
epsilon: Linear limb darkening. I(u) = 1-epsilon + epsilon*u
linear: flag for if the x-spacing is already linear in log-spacing. If true, we don't need to linearize
findcont: flag to decide if the continuum needs to be found
"""
c = constants.c.cgs.value
if not linear:
x0 = model.x[0]
x1 = model.x[-1]
x = np.logspace(np.log10(x0), np.log10(x1), model.size())
model = RebinData(model, x)
else:
x = model.x
if findcont:
model.cont = Continuum(model.x, model.y, lowreject=1.5, highreject=10)
# Make the broadening kernel.
dx = np.log(x[1]/x[0])
lim = vsini/c
if lim < dx:
#vsini is too small. Don't broaden
warnings.warn("vsini too small ({}). Not broadening!".format(vsini))
return model.copy()
#d_logx = np.arange(-lim, lim, dx*np.log10(200.0))
d_logx = np.arange(0.0, lim, dx)
d_logx = np.r_[-d_logx[::-1][:-1], d_logx]
alpha = 1.0 - (d_logx/lim)**2
B = (1.0-epsilon)*np.sqrt(alpha) + epsilon*np.pi*alpha/4.0 #Broadening kernel
B /= B.sum() #Normalize
# Do the convolution
broadened = model.copy()
broadened.y = fftconvolve(model.y, B, mode='same')
return broadened
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment