Last active
July 31, 2020 21:17
-
-
Save jeffbass/1f5fcf444aed0b49a4e6c8ca7ee6cb5f to your computer and use it in GitHub Desktop.
The HubData class of the Librarian
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""data_tools: data tools including classes, methods and attributes | |
Provides a variety of classes to hold, transfer, analyze, transform and query | |
the various data in the data library and in the imagehubs accessible to the | |
librarian. | |
Copyright (c) 2018 by Jeff Bass. | |
License: MIT, see LICENSE for more details. | |
""" | |
import sys | |
import pprint | |
import logging | |
import threading | |
import subprocess | |
from time import sleep | |
from pathlib import Path | |
from datetime import datetime | |
from collections import deque | |
from helpers.utils import YamlOptionsError | |
class HubData: | |
""" Methods and attributes to transfer data from imagehub data files | |
Provides methods for Librarian to access imagehub data, including event | |
logs and images stored by the imagehub. | |
Parameters: | |
settings (Settings object): settings object created from YAML file | |
""" | |
def __init__(self, settings): | |
# log directory and log file refer to the event log of the imhage_hub | |
ld = Path(settings.log_directory) | |
if not ld.exists(): | |
raise YamlOptionsError('Log directory in YAML file does not exist.') | |
elif not ld.is_dir(): | |
raise YamlOptionsError('Log directory in YAML file is not a directory.') | |
lf = ld / settings.log_file | |
if not lf.exists(): | |
raise YamlOptionsError('Log file in YAML file does not exist.') | |
elif not lf.is_file(): | |
raise YamlOptionsError('Log file in YAML file is not a file.') | |
self.log_file = lf # fully expanded name of current event log file | |
self.log_dir = ld | |
self.max_lines = 1000 # max number of lines to load from log file; add this to YAML file later? | |
self.maxlen = 3 # maybe settings.queuemax later? Also, maybe different for water node deque | |
self.event_data = {} # see description in load_log_data function | |
self.newest_log_line = '' # keep track of last text line read from log | |
self.line_count = 0 # total lines read into event_data since program startup; useful for librarian status | |
self.event_data_lock = threading.RLock() | |
self.load_log_data(ld, lf, self.max_lines) # inital load self.event_data() | |
# pprint.pprint(self.event_data) | |
# start the thread receive add data to self.event_data as new lines are | |
# added to the imagehub log. | |
self.log_check_interval = 2 # seconds: how often check for added log lines | |
t = threading.Thread(target=self.watch_for_new_log_lines) | |
# print('Starting watch_for_new_log_lines thread.') | |
t.daemon = True # allows this thread to be auto-killed on program exit | |
t.name = 'watch_for_new_log_lines' # naming the thread helps with debugging | |
t.start() | |
""" # this is the block of lines used to test self.add_new_log_lines() | |
print('Total number of lines read from all log files:', self.line_count) | |
print('BEFORE call to add_new_log_lines: Newest log line:', self.newest_log_line) | |
# testing the add lines modules | |
query = input('Add some lines to imagehub.log, then press enter. ') | |
self.add_new_log_lines() | |
print('AFTER call to add_new_log_lines: Newest log line:', self.newest_log_line) | |
pprint.pprint(self.event_data) | |
print('Total number of lines read from all log files:', self.line_count) | |
print('End of test') | |
sys.exit() """ | |
def load_log_data(self, ld, lf, max_lines): | |
""" read the imagehub log file(s), returning most recent num_lines lines | |
Event log files are rotating log files: | |
lf.log, lf.log.1, lf.log.2, lf.log.3, ... with lf.log being newest. | |
Reads the most recent (num_lines quantity of) lines from the log files. | |
It may involve reading more than one of the log files. Reads one line at | |
a time, adding the event data to the self.event_data dict() which is a | |
nested dictionary. Example data values from self.event_data: | |
node event deque of tuples of data values | |
| | | |
event_data['barn']['motion'] values[0] = (datetime, 'moving') # current | |
values[1] = (datetime, 'moving') # previous | |
values[2] = (datetime, 'moving') # earlier | |
Each data tuple is (datetime, event_value) where each | |
event_value is a measure like "77 degrees" or a state like "motion". | |
This deque is of fixed length, so as new data points are left_appended, | |
those data points beyond max_history are discarded from the event_data | |
dictionary (but not from the event log files; those are "read only" | |
from the perspective of the librarian; they are written ONLY by the | |
imagehub program). | |
Parameters: | |
ld (PosixPath): log directory containing event log files | |
lf (PosixPath): most recent and current log file (e.g. hub.log) | |
max_lines (int): number of lines to read from imagehub log file(s) | |
""" | |
logs = [log for log in ld.glob('*log*')] # list of log files | |
logs.sort() # likely to already be sorted OK, but be sure | |
# compute number of log files to read to accumulate at least max_lines | |
cum_lines = 0 | |
# print('max_lines:', max_lines) | |
for log_n, log in enumerate(logs): | |
with open(log) as lf: | |
lines = sum(1 for _ in lf) | |
cum_lines += lines | |
# print(log, lines, cum_lines, log_n) | |
if cum_lines >= max_lines: | |
break | |
# print('Log files to read are:') | |
# now read the required number of log files to load self.event_data | |
for log in reversed(logs[:log_n+1]): | |
with open(log, 'r') as f: | |
lines = f.readlines() | |
self.load_log_event_lines(lines) | |
def load_log_event_lines(self, lines): | |
""" loads lines from a log file into the event_data dict() | |
Parameters: | |
lines (list): lines from an imagehub event log file | |
max_lines: max number of lines to read from imagehub log file(s) | |
""" | |
for line in lines: | |
self.line_count += 1 | |
# node_tuple is (node, event, when, value) | |
node_tuple = self.parse_log_line(line) | |
self.load_log_event(node_tuple) | |
self.newest_log_line = lines[-1] | |
def load_log_event(self, node_tuple): | |
""" load a single node event into the self.event_data dict() | |
Creates a single entry for the self.event_data dict() that holds all | |
the events logged from imagenodes. | |
All string values are stripped of whitespace and converted to lower | |
case: node, event, value. | |
When is a datetime value and is stored as is. | |
Parameters: | |
node_tuple (tuple): parsed values from a single event log line | |
""" | |
# node_tuple is (node, event, when, value) | |
node = node_tuple[0].strip().lower() | |
event = node_tuple[1].strip().lower() | |
when = node_tuple[2] | |
value = node_tuple[3].strip().lower() | |
with self.event_data_lock: | |
if node not in self.event_data: | |
self.event_data[node] = {} | |
if event not in self.event_data[node]: | |
self.event_data[node][event] = deque(maxlen=self.maxlen) | |
self.event_data[node][event].appendleft((when, value)) | |
def parse_log_line(self, line): | |
""" parse a single line from a log file returnin a tuple of values from it | |
Example: Input Log data lines like these: | |
2020-06-09 18:27:11,776 ~ Driveway Mailbox|motion|moving | |
2020-06-09 18:33:15,788 ~ Barn|Temp|83 F | |
Return tuples like these: | |
(Driveway Mailbox, motion, datetime, moving) | |
(Barn, Temp, datetime, 83) | |
Parameters: | |
line (str): a single log line read from a log file | |
Returns: | |
tuple (node, event, when, value) | |
""" | |
two_parts = line.split('~') | |
part1 = two_parts[0].strip() | |
when = datetime.strptime(part1, "%Y-%m-%d %H:%M:%S,%f") | |
part2 = two_parts[1].rstrip(' F\n').strip().split('|') | |
if len(part2) < 3: # this is not a node message; system or other msg | |
node = 'non-node' | |
event = 'other' | |
value = part2[0] # there will be at least one strng | |
else: | |
node = part2[0] # e.g. barn | |
event = part2[1] # e.g. motion | |
value = part2[2] # e.g. still | |
return node, event, when, value | |
def watch_for_new_log_lines(self): | |
""" watch_for_new_log_lines: thread to fetch newly added log lines | |
""" | |
while True: | |
self.add_new_log_lines() | |
sleep(self.log_check_interval) | |
def add_new_log_lines(self): | |
""" add new event log data lines to self.event_data dict() | |
Runs in a thread that is started when HubData is instantiated. | |
Checks imagehub event log file(s) for any changes by using the linux | |
"tail" utility (chosen because it is very fast and does NOT read the | |
entire file as a Python program would need to). Adds any newly added | |
event log lines to the self.event_data dict(). | |
Algorithm: | |
1. tail -n_lines from current log file | |
2. is newest_log_line the last line in the tail? return; no new lines | |
3. if the tail of n_lines includes the newest_log_line, then | |
load_log_event_lines from that line to log end | |
4. else do a tail with more lines up until either find newest_log_line | |
or the entire last 2 log files have been returned | |
""" | |
# get n_lines from tail of log file and check if contains last_line_read | |
line_num = 0 | |
try_n_lines = [10, 20, 40, 80, 160, 320, 640, 1024, 2048, 4096] | |
for n_lines in try_n_lines: | |
# OS command equivalent: tail -n_lines < self.log_file | |
lines = self.log_tail(n_lines) | |
# print("A: len(lines) is ", len(lines), 'n_lines:', n_lines) | |
""" for i, l in enumerate(lines): | |
print('Line', i, ':', l) | |
print('B: Comparison of lines[-1]:') | |
print('B: and self.newest_log_line') | |
print(lines[-1][:23]) | |
print(self.newest_log_line[:23]) | |
assert lines[-1][:23] == self.newest_log_line[:23], "First 23?" | |
assert lines[-1] == self.newest_log_line, "Full length not equal!" | |
print('After assert.') """ | |
if lines[-1][:30] == self.newest_log_line[:30]: | |
# print('C: right before return') | |
return # there are no new lines in log file | |
# print('D: after lines[-1] comparison:') | |
# print('len(lines) vs. n_lines:') | |
# print("D: len(lines) is ", len(lines), 'n_lines:', n_lines) | |
if len(lines) > n_lines: # added a 2nd log file, load all lines | |
self.load_log_event_lines(lines) | |
return | |
for n, line in enumerate(lines): # is newest log line in tail? | |
if line[:30] == self.newest_log_line[:30]: # found a match line | |
# print('About to add lines from', n, ' to ', len(lines)-1 ) | |
# for i, l in enumerate(lines[n+1:]): | |
# print('Line', i, ':', l) | |
self.load_log_event_lines(lines[n+1:]) | |
return | |
return | |
def log_tail(self, n_lines): | |
""" uses linux "tail" command to get last n_lines from newest log file | |
Called by add_new_log_lines in a thread that is started when HubData is | |
instantiated. | |
If n_lines exceeds number of lines in current log file, combine with | |
earlier log file; current limit is 1 earlier log file | |
Parameters: | |
n_lines (int): number of lines to "tail" from the log file(s). | |
Returns: | |
lines (list): lines returned by running os command "tail -n_lines" | |
""" | |
n = '-n ' + str(n_lines).strip() # prepare -n argument | |
tail = subprocess.run(['tail', n, self.log_file], | |
capture_output=True, text=True) | |
lines = tail.stdout.splitlines() | |
if len(lines) < n_lines: # we got fewer lines than requested; | |
# add entire 2nd log file for testing purposes, | |
# and return both log files in lines | |
logs = [log for log in self.log_dir.glob('*log*')] # list of log files | |
logs.sort() # likely to already be sorted OK, but be sure | |
with open(logs[1], 'r') as f: | |
lines1 = f.readlines() | |
lines1.extend(lines) # both log files are combined; newest last | |
return lines1 | |
# print('Number of lines returned from log_tail:', len(lines)) | |
return lines | |
def fetch_event_data(self, node, event): | |
""" fetch some specified data from event logs or images | |
This fetches data from the self.event_data dict() that holds event data. | |
Data returned is either 'current' or 'previous', or both, where | |
'current' is the most recent logged event for a node, and 'previous' is | |
the one immediately preceding it. | |
Returned data values are always a string, even if representing a number. | |
Parameters: | |
node (str): what node to fetch data for, e.g., barn | |
event_type (str): what event or measurement, e.g. temperature or motion | |
Returns: | |
(2 tuples): (current, previous): with each tuple containing: | |
datetime (datetime): the datetime associated with the event | |
value (str): the feteched data item, e.g. '77' for temperature | |
""" | |
node = node.strip().lower() # all string values in event_data are | |
event = event.strip().lower() # already stripped and lower case | |
with self.event_data_lock: # acquire lock to work with event_data updates | |
event_type = self.event_data.get(node, None) | |
if event_type: | |
event_deque = event_type.get(event, None) | |
if event_deque: | |
current = event_deque[0] # the most recent date & value | |
if len(event_deque) > 1: | |
previous = event_deque[1] # the previous date & value | |
else: | |
previous = None | |
return (current, previous) | |
else: | |
return None, " ".join(["Don't know", node, event]) | |
else: | |
return None, " ".join(["Don't know", node]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment