Last active
June 15, 2023 20:08
-
-
Save louismullie/b111be352e4f581670ed8dedf00cfd13 to your computer and use it in GitHub Desktop.
CareScape parser
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
from struct import iter_unpack | |
from quopri import decodestring | |
import magic | |
import matplotlib.pyplot as plt | |
import numpy as np | |
# Constants | |
UNITS = { | |
'EKG': 'uV', | |
'Pulse oximetry': '%', | |
'Pressure - artery': 'mmHg', | |
'Pressure - central_venous': 'mmHg', | |
} | |
UNIT_SCALE = 0.001 | |
# Plotting constants | |
Y_LIM_LOW = { | |
'EKG': -0.75, | |
'Pulse oximetry': 0, | |
'Pressure - artery': 0, | |
'Pressure - central_venous': 0, | |
} | |
Y_LIM_HIGH = { | |
'EKG': 0.75, | |
'Pulse oximetry': 150, | |
'Pressure - artery': 150, | |
'Pressure - central_venous': 30, | |
} | |
def load_data(file_name): | |
""" | |
Reads a binary file and returns its content as a list of lines. | |
""" | |
with open(file_name, 'rb') as file: | |
lines = file.readlines() | |
return lines | |
def process_data(lines): | |
""" | |
Main data processing function. Breaks the input lines into chunks, processes these | |
chunks, and then processes the binary data. Returns the processed leads and sources. | |
""" | |
chunks = chunk_lines(lines) | |
binary_lines, byte_sections, endian = process_chunks(chunks) | |
leads, sources = process_binary_data(binary_lines, byte_sections, endian) | |
return leads, sources | |
def chunk_lines(lines): | |
""" | |
Splits the input lines into chunks using the '--' separator. | |
""" | |
chunks = [] | |
current_chunk = [] | |
for line in lines: | |
if line[0:2] == b'--' and len(current_chunk) > 0: | |
chunks.append(current_chunk) | |
current_chunk = [] | |
else: | |
current_chunk.append(line) | |
return chunks | |
def process_chunks(chunks): | |
""" | |
Processes each chunk based on its MIME type, identified by the guess_mime_type function. | |
Returns the processed binary lines, byte sections and endianess. | |
""" | |
binary_lines = [] | |
byte_sections = [] | |
endian = None | |
for chunk in chunks: | |
guessed_mime_type, joined_chunk = guess_mime_type(chunk) | |
if guessed_mime_type in ['text/html', 'application/xml']: | |
endian, byte_sections = process_textual_data(joined_chunk, byte_sections) | |
elif guessed_mime_type == 'application/octet-stream': | |
binary_lines = process_binary_data_chunk(chunk, binary_lines) | |
elif guessed_mime_type == 'text/plain': | |
continue | |
else: | |
raise Exception(f'ERROR: unrecognized MIME type {guessed_mime_type}') | |
return binary_lines, byte_sections, endian | |
def guess_mime_type(chunk): | |
""" | |
Joins a chunk into a single string and guesses its MIME type using the magic module. | |
Returns the guessed MIME type and the joined chunk. | |
""" | |
joined_chunk = b''.join(chunk) | |
guessed_mime_type = magic.from_buffer(joined_chunk, mime=True) | |
return guessed_mime_type, joined_chunk | |
def process_textual_data(joined_chunk, byte_sections): | |
""" | |
Processes textual data by decoding the chunk, identifying endianess, and | |
extracting data from chunk lines. Returns the endianess and byte sections. | |
""" | |
endian = None | |
current_ip_site = None | |
current_ip_label = None | |
decoded_str = decodestring(joined_chunk) | |
if b'bigEndian' in decoded_str: | |
endian = '>' | |
elif b'littleEndian' in decoded_str: | |
endian = '<' | |
chunk_lines = decoded_str.split(b'\n') | |
for chunk_line in chunk_lines: | |
m = re.search(r'site="([a-zA-Z0-9_]+)"', str(chunk_line)) | |
if m is not None: | |
current_ip_site = m.group(1) | |
m = re.search(r'label="([a-zA-Z0-9]+)"', str(chunk_line)) | |
if m is not None: | |
current_ip_label = m.group(1) | |
if b'BT=' in chunk_line: | |
byte_sections = process_byte_data(chunk_line, byte_sections, current_ip_site) | |
return endian, byte_sections | |
def process_byte_data(chunk_line, current_ip_site): | |
""" | |
Processes a line of chunk data by identifying the binary type and extracting | |
various data properties. Returns a list containing the unit size, unit number, | |
format char, name, and lead. | |
""" | |
unit_size = None | |
format_char = None | |
name = None | |
lead = None | |
# Check the binary type | |
if b'BT="xs:unsignedByte' in chunk_line: | |
unit_size = 8 | |
format_char = 'c' | |
elif b'BT="xs:short' in chunk_line: | |
unit_size = 16 | |
format_char = 'h' | |
elif b'BT="xs:unsignedShort' in chunk_line: | |
unit_size = 16 | |
format_char = 'H' | |
elif b'BT="xs:unsignedInt' in chunk_line: | |
unit_size = 32 | |
format_char = 'I' | |
elif b'BT="xb:NTP-32' in chunk_line: | |
unit_size = 32 | |
format_char = 'L' | |
elif b'BT="xb:bool-8' in chunk_line: | |
unit_size = 8 | |
format_char = 'c' | |
else: | |
raise Exception('Unrecognized data type') | |
# Extract array size | |
m = re.search(r'asizeBT="([0-9]+)"', str(chunk_line)) | |
unit_num = int(m.group(1)) if m is not None else 1 | |
# Extract name | |
m = re.search(r'<([a-zA-Z]+) ', str(chunk_line)) | |
name = m.group(1) if m is not None else '' | |
# Extract lead | |
m = re.search(r'lead="([a-zA-Z0-9]+)"', str(chunk_line)) | |
lead = m.group(1) if m is not None else '' | |
# Rename if necessary | |
if name == 'ipWaveform': | |
name = 'Pressure - ' + current_ip_site | |
elif name == 'pleth': | |
name = 'Pulse oximetry' | |
elif name == 'ecgWaveform': | |
name = 'Electrocardiogram' | |
return [unit_size, unit_num, format_char, name, lead] | |
def process_binary_data(binary_lines, byte_sections, endian): | |
""" | |
Processes binary data by parsing the data based on the byte sections. | |
Populates the leads and sources. Returns the processed leads and sources. | |
""" | |
leads = {'I': [], 'II': [], 'III': [], 'AVR': [], 'AVF': [], 'AVL': [], 'V1': []} | |
sources = {'Electrocardiogram': [], 'Pulse oximetry': [], | |
'Pressure - artery': [], 'Pressure - central_venous': []} | |
for binary_data in binary_lines: | |
pointer = 0 | |
for byte_section in byte_sections: | |
section_length = byte_section[0] * byte_section[1] | |
section_format = byte_section[2] | |
byte_length = int(section_length / 8) | |
section_data = binary_data[pointer:pointer+byte_length] | |
section_name = byte_section[3] | |
section_lead = byte_section[4] | |
parsed_data = iter_unpack(endian + section_format, section_data) | |
section_data = [] | |
for item in parsed_data: | |
section_data.append(item[0]) | |
if section_name in sources.keys(): | |
if len(np.unique(section_data)) > 1: | |
sources[section_name] = sources[section_name] + list(section_data) | |
if section_name == 'Electrocardiogram' and section_lead in leads.keys(): | |
if len(np.unique(section_data)) > 1: | |
leads[section_lead] = leads[section_lead] + list(section_data) | |
pointer += byte_length | |
assert(pointer == len(binary_data)) | |
return leads, sources | |
def process_binary_data_chunk(chunk, binary_lines): | |
""" | |
Processes a chunk of binary data by identifying the start of the binary data and | |
joining all binary lines. Returns the binary lines. | |
""" | |
binary_started = False | |
binary_lines_chunk = [] | |
for chunk_line in chunk: | |
if chunk_line == b'Content-Transfer-Encoding: binary\r\n': | |
binary_started = True | |
continue | |
if chunk_line == b'\r\n': | |
continue | |
if binary_started: | |
binary_line = chunk_line | |
binary_lines_chunk.append(binary_line) | |
binary_line = b''.join(binary_lines_chunk) | |
# trailing \r | |
if binary_line[-1] == 10: | |
binary_line = binary_line[0:-1] | |
# trailing \n | |
if binary_line[-1] == 13: | |
binary_line = binary_line[0:-1] | |
binary_lines.append(binary_line) | |
return binary_lines | |
def plot_data(leads, sources): | |
""" | |
Plots the processed data using the plot_series function. | |
""" | |
leads_with_data = [x for x in leads.keys() if len(leads[x]) > 0] | |
sources_with_data = [x for x in sources.keys() if len(sources[x]) > 0] | |
num_channels = len(leads_with_data) + len(sources_with_data) | |
fig, axs = plt.subplots(num_channels) | |
for i, lead in enumerate(leads_with_data): | |
plot_series(axs[i], leads[lead], 'EKG - ' + lead, UNITS['EKG'], | |
UNIT_SCALE, Y_LIM_LOW['EKG'], Y_LIM_HIGH['EKG']) | |
for j, source in enumerate(sources_with_data): | |
plot_series(axs[i+j+1], sources[source], source, UNITS[source], | |
UNIT_SCALE, Y_LIM_LOW[source], Y_LIM_HIGH[source]) | |
plt.subplots_adjust(hspace=2.5) | |
plt.show() | |
def plot_series(ax, data, title, units, unit_scale, ylim_low=None, ylim_high=None): | |
""" | |
Plots a series of data on a given axes. | |
""" | |
ax.set_title(title) | |
ax.tick_params(labelsize=5) | |
ax.plot(np.asarray(data) * unit_scale) | |
ax.set_ylabel(units, fontsize=7) | |
if ylim_low and ylim_high: | |
ax.set_ylim([ylim_low, ylim_high]) | |
def main(): | |
""" | |
Loads data, processes it, and then plots the results. | |
""" | |
# Load data | |
lines = load_data('ecg_test2.txt') | |
# Process data | |
leads, sources = process_data(lines) | |
# Plot data | |
plot_data(leads, sources) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment