Skip to content

Instantly share code, notes, and snippets.

@TerryGeng
Created November 26, 2020 15:13
Show Gist options
  • Save TerryGeng/997600cfd681801aba49277e7d9a08dd to your computer and use it in GitHub Desktop.
Save TerryGeng/997600cfd681801aba49277e7d9a08dd to your computer and use it in GitHub Desktop.
Utility to receive the UDP binary log from a Huawei Router/Firewall.
"""
Utility to receive the UDP binary log from a Huawei Router/Firewall.
Author: Terry Geng
Date: 2020-11-26
"""
import socket
import select
import os
import string
import time
from datetime import datetime, timedelta
BIND_ADDR = "0.0.0.0"
BIND_PORT = 9002
FILE_NAME_FORMAT = "/var/log/remote/{hostname}/{date}T{hour}.blog"
KNOWN_HOST = {
('210.28.143.205', 20000): 'Huawei-LB02'
}
class UDPServer:
def __init__(self, bind_addr, bind_port):
self.bind_addr = BIND_ADDR
self.bind_port = BIND_PORT
self.whitelist = []
self.blacklist = []
self.data_hooks = []
self.exit_hooks = []
def start(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((BIND_ADDR, BIND_PORT))
sock.setblocking(0)
inputs = [sock]
try:
wl = self.whitelist
bl = self.blacklist
while True:
infds, _, exfds = select.select(inputs, [], inputs, 5)
if len(infds) == 0:
continue
for infd in infds:
data, addr = infd.recvfrom(1024)
if addr in bl:
continue
if wl:
if addr not in wl:
continue
for hook in self.data_hooks:
hook(data, addr)
except KeyboardInterrupt:
for hook in self.exit_hooks:
hook()
class Template:
def __init__(self, template):
self.template = template
fmt = string.Formatter()
self.used_keys = \
[i[1] for i in filter(lambda tpl: tpl[1], fmt.parse(template))]
self.next_update_time = self.compute_next_update_time(self.used_keys)
def format(self, **kwargs):
self.next_update_time = self.compute_next_update_time(self.used_keys)
t = datetime.now()
kwargs.update(
date=t.strftime("%Y-%m-%d"),
day=t.day,
month=t.month,
year=t.year,
hour=t.hour,
minute=t.minute,
second=t.second
)
return self.template.format(**kwargs)
@staticmethod
def compute_next_update_time(keys):
ret = 0
dt = datetime.now()
if 'second' in keys:
ret = datetime(dt.year, dt.month, dt.day, dt.hour, dt.minute,
dt.second) + timedelta(seconds=1)
elif 'minute' in keys:
ret = datetime(dt.year, dt.month, dt.day, dt.hour, dt.minute,
0) + timedelta(mintues=1)
elif 'hour' in keys:
ret = datetime(dt.year, dt.month, dt.day, dt.hour, 0, 0)\
+ timedelta(hours=1)
elif 'date' in keys or 'day' in keys:
ret = datetime(dt.year, dt.month, dt.day, 0, 0, 0)\
+ timedelta(days=1)
elif 'month' in keys:
ret = datetime(dt.year, dt.month, 0, 0, 0, 0) + timedelta(months=1)
elif 'year' in keys:
ret = datetime(dt.year, 0, 0, 0, 0, 0) + timedelta(years=1)
else:
return 0
return ret.timestamp()
class LogFileWritter:
def __init__(self, name_template, hostname_lookup):
self.fds = {}
self.name_template = Template(name_template)
self.hostname_lookup = hostname_lookup
self.filename_update_time = 0
def write_to_file(self, data, addr):
if self.filename_update_time and time.time() > self.filename_update_time:
self.close_files()
if addr not in self.fds:
self.open_file(addr)
self.fds[addr].write(data)
def open_file(self, addr):
name = self.name_template.format(
hostname=self.hostname_lookup[addr],
addr=f"{addr[0]}:{addr[1]}"
)
self.filename_update_time = \
max(self.filename_update_time,
self.name_template.next_update_time)
dirs = os.path.dirname(name)
if not os.path.exists(dirs):
os.makedirs(dirs)
self.fds[addr] = open(name, "ab")
def close_files(self):
for fd in self.fds.values():
fd.close()
self.fds.clear()
if __name__ == "__main__":
server = UDPServer(BIND_ADDR, BIND_PORT)
server.whitelist = KNOWN_HOST.keys()
logger = LogFileWritter(FILE_NAME_FORMAT, KNOWN_HOST)
server.data_hooks.append(logger.write_to_file)
server.exit_hooks.append(logger.close_files)
server.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment