Created
November 3, 2021 15:20
-
-
Save creisor/0ba67406222197593126cb00c32eef5b to your computer and use it in GitHub Desktop.
Nagios check for journalctl
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
"""A Nagios check for entries with a certain log level in journalctl. | |
For example, you might want to alert if there have been 5 logs with level 'error' in the journal in the past 5 minutes: | |
check_journalctl -u my-cool-app --since 5m --log-level error --warning 2.0 --critical 5.0 | |
""" | |
import sys | |
import re | |
import argparse | |
import datetime | |
import subprocess | |
import json | |
import logging | |
def parse_args(): | |
"""Parses the commandline arguments""" | |
parser = argparse.ArgumentParser(description='Check for a loglevel in syslog for a unit.') | |
parser.add_argument('--unit-name', '-u', type=str, nargs='?', dest='unit_name', required=True, | |
help='the systemd unit name of the program whose log is being checked (see the -u flag for journalctl)') | |
parser.add_argument('--since', '-s', nargs='?', dest='since', required=True, type=since_abbreviation, | |
help='how much time in seconds (s), minutes (m), or hours (h) you want to search backwards, (e.g.: "5m" or "600s")') | |
parser.add_argument('--log-level', '-l', type=str, nargs='?', dest='level', | |
help='the loglevel to search for', default='error') | |
parser.add_argument('--warning', '-w', type=float, nargs='?', dest='warning', default=1.0, | |
help='warning threshold count of entries of "log-level" type (e.g.: if --log-level is error, N errors)') | |
parser.add_argument('--critical', '-c', type=float, nargs='?', dest='critical', default=2.0, | |
help='critical threshold count of entries of "log-level" type (e.g.: if --log-level is error, N errors)') | |
parser.add_argument('--verbose', '-V', help='Verbose logging', action='store_true') | |
return parser.parse_args() | |
def since_abbreviation(since): | |
"""Takes a string and returns a string representing now() minus that time, or raises ArgumentTypeError""" | |
match = re.search(r'^(?P<number>\d+)(?P<period>[smh])$', since) | |
err_msg = 'since abbreviation should be in the form N[smh], e.g.: 600s, 5m, 1h' | |
if not match: | |
raise argparse.ArgumentTypeError(err_msg) | |
if match.group('period') not in ['s', 'm', 'h']: | |
raise argparse.ArgumentTypeError(err_msg) | |
now = datetime.datetime.today() | |
periods = {'s': 0, 'm': 0, 'h': 0} | |
periods[match.group('period')] = int(match.group('number')) | |
delta = datetime.timedelta(days=0, seconds=periods['s'], microseconds=0, | |
milliseconds=0, minutes=periods['m'], hours=periods['h'], weeks=0) | |
# from man journalctl: | |
# Date specifications should be of the format "2012-10-30 18:17:16". If the time part is omitted, "00:00:00" is assumed. | |
return (now - delta).strftime("%Y-%m-%d %H:%M:%S") | |
class Monitor: | |
states = { | |
'ok': {'text': 'OK', 'code': 0}, | |
'warning': {'text': 'WARNING', 'code': 1}, | |
'critical': {'text': 'CRITICAL', 'code': 2}, | |
'unknown': {'text': 'UNKNOWN', 'code': 3}, | |
} | |
def __init__(self, unit_name, since, level, warning, critical): | |
self.unit_name = unit_name | |
self.since = since | |
self.level = level | |
self.warning = warning | |
self.critical = critical | |
def check(self): | |
"""Get the journal logs, evaluate the log entries based on the thresholds, print Nagios message to stdout, and return""" | |
logs = self.__get_journal_logs() | |
self.messages = [l['msg'] for l in logs if l['level'] == self.level] | |
return self.__eval() | |
def __eval(self): | |
logging.debug("Evaluating with warning threshold of '{}' and critical threshold of '{}'".format(self.warning, self.critical)) | |
metric = len(self.messages) | |
logging.debug("{} {} messages: {}".format(metric, self.level, '"{0}"'.format('", "'.join(self.messages)))) | |
if metric > self.critical: | |
return self.__status(self.states['critical'], metric) | |
if metric > self.warning: | |
return self.__status(self.states['warning'], metric) | |
return self.__status(self.states['ok'], metric) | |
def __status(self, state, metric): | |
output_template = "JOURNALCTL {} - {} messages at log level '{}'" | |
print(output_template.format(state['text'], metric, self.level)) | |
return state['code'] | |
def __get_journal_logs(self): | |
"""returns parsed json MESSAGE field of the journald logs""" | |
cmd = ['journalctl', '-u', self.unit_name, '-o', 'json', '--since', '"{}"'.format(self.since)] | |
logging.debug("Running cmd: {}".format(' '.join(cmd))) | |
stdout = subprocess.check_output(' '.join(cmd), stderr=subprocess.STDOUT, shell=True) | |
logs = [] | |
for logline in stdout.split(b"\n"): | |
try: | |
logs.append(json.loads(logline.decode('utf-8'))) | |
except json.decoder.JSONDecodeError: | |
continue | |
inner_logs = [] | |
messages = [l['MESSAGE'] for l in logs] | |
for msg in messages: | |
try: | |
inner_logs.append(json.loads(msg)) | |
except json.decoder.JSONDecodeError: | |
continue | |
return inner_logs | |
if __name__ == "__main__": | |
args = parse_args() | |
log_format = '%(asctime)-15s - %(message)s' | |
log_level = logging.DEBUG if args.verbose else logging.INFO | |
logging.basicConfig(level=log_level, format=log_format) | |
logging.debug("Checking logs since {}".format(args.since)) | |
mon = Monitor(args.unit_name, args.since, args.level, args.warning, args.critical) | |
sys.exit(mon.check()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment