Last active
February 20, 2022 06:05
-
-
Save alexanderbailey1/5378a7ace63e555d3c90f366c4e4d243 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# Kurtosisf.py calculates the spectral kurtosis from fast-capture .dada files | |
# and outputs spectral kurtosis, real and imaginary kurtosis values, and flagging | |
# information for each window of every channel in all of the antennas. | |
# Commented out code relates to making histograms, and other junk. I know it's a mess. | |
# | |
# Original code written by Hugh Garsden | |
# Edited and made a general mess of by Alex Bailey, June 2017 | |
# | |
import numpy as np | |
import os, scipy.stats | |
import flags, random | |
import matplotlib.pyplot as plt | |
M = 10320/2 | |
FFT_SIZE = 8192 | |
file_count = 0 | |
def parse_complex(x): # from 8 bit complex | |
if type(x) == np.int8: | |
re = np.bitwise_and(x, np.int8(0b11110000)) / 16 | |
im = np.int8(np.left_shift(np.bitwise_and(x, np.int8(0b00001111)), 4)) /16 | |
else: # pretend data | |
return complex(x[0], x[1]) | |
#print re, im | |
return complex(im, re) | |
def kurtosis(): | |
# histogram = np.zeros(15) | |
auto_corr = np.zeros(M, dtype=np.complex64) | |
for i in range(M): | |
auto_corr[i] = kbuff[i]*np.conj(kbuff[i])/FFT_SIZE | |
# histogram[kbuff[i].real+7] += 1 | |
psd = 2.0*np.real(auto_corr)/FFT_SIZE # Data has been correlated, but these factors need to be added in | |
S1 = np.sum(psd) | |
S2 = np.sum(psd**2.0) | |
#Compute spectral kurtosis estimator | |
Vk_square = (M+1)/(M-1)*(M*(S2/(S1**2.0))-1) # eqn 8 http://mnrasl.oxfordjournals.org/content/406/1/L60.full.pdf | |
# and eqn 50 https://web.njit.edu/~gary/assets/PASP_Published_652409.pdf | |
return Vk_square, scipy.stats.kurtosis(np.real(kbuff)), scipy.stats.kurtosis(np.imag(kbuff)) | |
# main loop | |
directory = "/data2/fc/ledaovro8/one/" | |
for filename in os.listdir(directory): | |
file_count += 1 | |
#filename = "/data2/fc/ledaovro10/one/2017-06-07-07:53:21_0000009215016960.000000.dada" | |
print filename | |
if not os.path.exists("/home/leda/abailey/datadump8/8_near1_"+str(file_count)+".txt"): | |
fid=open(str(directory)+str(filename), "rb") | |
header=fid.read(4096) | |
data_all=np.fromfile(fid, dtype=np.int8, count=os.path.getsize(directory+filename)-4096) | |
goodSK = open("8_near1_"+str(file_count)+".txt", "a") | |
largeSK = open("8_large_"+str(file_count)+".txt", "a") | |
smallSK = open("8_small_"+str(file_count)+".txt", "a") | |
# Array dimensions from C code,want to convert | |
# [N sequence numbers][16 roaches][4 input groups][2 time samples][109 chans][4 stations][2 pols][2 dims][4 bits] | |
# dims is real/imag | |
# Buffer for kurtosis windows | |
kbuff = np.array([ 0j for i in range(M) ]) | |
kbuff_index = 0 | |
chunk_size = 16*4*2*109*4*2 # in bytes | |
num_chunks = data_all.shape[0]/chunk_size | |
print "Num chunks", num_chunks, "Chunk size (bytes)", chunk_size | |
print "Threshold?", 4.0*M**2/((M-1)*(M+2)*(M+3)) | |
if num_chunks != 10320: #this would throw an error and stop the loop. This error check simply results in an empty output text file | |
continue | |
# Python copy of Matlab | |
data_all = data_all.reshape(num_chunks, 64, 2, 109, 8) | |
data_all = np.transpose(data_all, (0, 2, 3, 1, 4)) | |
data_all = data_all.reshape((num_chunks*2, 109, 512)) # Data converted into nice form | |
#if os.path.exists("histograms.html"): | |
# os.remove("histograms.html") | |
fl = flags.Flags() # Flag bad ones | |
fl.select_leda_core() | |
for antenna in range(512): | |
#antenna = 2 | |
for channel in range(4): | |
chan = 4 + channel*20 | |
stand = antenna//2 | |
if fl.stand_flagged(stand): message = "FLAGGED" | |
else: message = "NOT_FLAGGED" | |
print antenna, chan | |
kbuff = np.array([ 0j for i in range(M) ]) | |
kbuff_index = 0 | |
#best_k = 1e9 | |
#best_sk = 1e9 | |
#best_sk_tester = 1e9 | |
#where_best_k = () | |
#where_best_sk = () | |
#random_chunk = int(np.floor(random.random()*num_chunks*2)) | |
for i in range(num_chunks*2): | |
c = parse_complex(data_all[i, chan, antenna]) | |
# if c.real not in [ -8, -7, 7 ] and c.imag not in [ -8, -7, 7 ]: # Clip edge values, we dont trust them | |
# Add to buffer | |
kbuff[kbuff_index] = parse_complex(data_all[i, chan, antenna]) | |
kbuff_index += 1 | |
#else: print "Got -8 value" | |
if kbuff_index == M: # Buffer is full | |
sk, k1, k2 = kurtosis() # Nita/ | |
# print "K", sk, k1, k2, stand, message | |
#sctplt.scatter(k1,k2) | |
if abs(1-sk) < .08353: #SK value within .08353 of one calculated from http://iopscience.iop.org/article/10.1086/652409#df56 | |
goodSK.write(str(antenna)+" "+str(chan)+" "+str(sk)+" "+str(k1)+" "+str(k2)+" "+str(message)+"\n") | |
elif sk > 1: | |
largeSK.write(str(antenna)+" "+str(chan)+" "+str(sk)+" "+str(k1)+" "+str(k2)+" "+str(message)+"\n") | |
else: | |
smallSK.write(str(antenna)+" "+str(chan)+" "+str(sk)+" "+str(k1)+" "+str(k2)+" "+str(message)+"\n") | |
# if abs(1-sk) < best_sk_tester: | |
# best_sk_tester = (1-abs(sk)) | |
# best_sk = abs(sk) | |
# where_best_sk = (i, chan, antenna ) | |
# if abs(k1) < best_k: | |
# best_k = abs(k1) | |
# where_best_k = ( i, chan, antenna ) | |
# if abs(k2) < best_k: | |
# best_k = abs(k2) | |
# where_best_k = ( i, chan, antenna ) | |
# kurtosis_data.write(str(sk) +" " + str(k1) + " " + str(k2) + "\n") | |
kbuff_index = 0 | |
# print "Res", best_sk, where_best_sk, where_best_k | |
#kurtosis_data.write(str(best_sk) + " " + str(where_best_sk)+"\n") | |
# plot(antenna, chan, sk, k1, k2, hist, message) | |
fid.close() | |
goodSK.close() | |
largeSK.close() | |
smallSK.close() | |
#sctplt.savefig("noise.png") | |
#sctplt.clf() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment