Created
April 27, 2012 07:33
-
-
Save edma2/2506971 to your computer and use it in GitHub Desktop.
beaglenmt diagnostics web server for http://dev.frozeneskimo.com/embedded_projects/beaglebone_network_multitool_server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# beaglenmt diagnostics web server - Vanya Sergeev <vsergeev at gmail.com> | |
# http://dev.frozeneskimo.com/embedded_projects/beaglebone_network_multitool_server | |
# | |
import cherrypy | |
import os, subprocess, time, shlex, threading, copy | |
FORTUNE_BIN_PATH = "/home/edma2/minifortune" | |
FORTUNE_DIR_PATH = "/home/edma2/fortunes" | |
STATS_UPDATE_INTERVAL = 3 | |
STATS_INTERFACES = ["eth0", "tun6in4"] | |
uname = subprocess.check_output(["uname", "-msnro"]) | |
class ExpireDict(dict): | |
"""A thread-safe dictionary with items that expire after a timeout.""" | |
def __init__(self, timeout): | |
self.times = {} | |
self.timeout = timeout | |
self.lock = threading.Lock() | |
def __setitem__(self, key, value): | |
self.lock.acquire() | |
self.times[key] = time.time() | |
super(ExpireDict, self).__setitem__(key, value) | |
self.lock.release() | |
def __contains__(self, item): | |
if super(ExpireDict, self).__contains__(item): | |
return self.expired(item) | |
def expired(self, item): | |
return time.time() - self.times[item] < self.timeout | |
def cached(timeout): | |
cache = ExpireDict(timeout) | |
def wrapper(func): | |
def wrapped(*args): | |
if args not in cache: | |
cache[args] = func(*args) | |
return cache[args] | |
return wrapped | |
return wrapper | |
# Decorator to wrap try/except blocks with an error return | |
def try_except(err): | |
def _try_except(func): | |
def wrapped(*args, **kwargs): | |
try: return func(*args, **kwargs) | |
except: return err | |
return wrapped | |
return _try_except | |
@try_except("Error fetching fortune!\n") | |
def fetch_fortune(): | |
# Look up a fortune with minifortune | |
return subprocess.check_output([FORTUNE_BIN_PATH, FORTUNE_DIR_PATH], stderr=subprocess.STDOUT) | |
@try_except((["Error looking up IPv4 Addresses!"], ["Error looking up IPv6 Addresses!"])) | |
def fetch_ip(): | |
# Look up our IPv4 and Global Scope IPv6 Addresses with 'ip' | |
ipv4_addresses = [] | |
ipv6_addresses = [] | |
for iface in STATS_INTERFACES: | |
try: addrinfo = [shlex.split(a) for a in subprocess.check_output(["ip", "addr", "show", "dev", iface], stderr=subprocess.STDOUT).split("\n")] | |
except subprocess.CalledProcessError: continue | |
addrinfo = filter(lambda l: len(l) > 2, addrinfo) | |
for ai in addrinfo: | |
if ai[0] == "inet": ipv4_addresses.append(ai[1]) | |
if ai[0] == "inet6" and ai[1][0:4] == "2001": ipv6_addresses.append(ai[1]) | |
return (ipv4_addresses, ipv6_addresses) | |
@try_except("Error processing /proc/uptime!") | |
def fetch_uptime(): | |
# Process /proc/uptime | |
proc_uptime_f = open('/proc/uptime', 'r') | |
uptime_sec = float(proc_uptime_f.read().split(' ')[0]) | |
proc_uptime_f.close() | |
uptime_min, uptime_sec = divmod(uptime_sec, 60) | |
uptime_hour, uptime_min = divmod(uptime_min, 60) | |
uptime_day, uptime_hour = divmod(uptime_hour, 24) | |
uptime_stat = "%d days, %02d:%02d:%02d h:m:s" % (uptime_day, uptime_hour, uptime_min, uptime_sec) | |
return uptime_stat | |
@try_except(["Error processing /proc/net/dev!"]) | |
def fetch_netrxtx(): | |
# Process /proc/net/dev | |
proc_netdev_f = open('/proc/net/dev', 'r') | |
netrxtx_stats = [ shlex.split(s) for s in proc_netdev_f.read().split('\n') ] | |
netrxtx_stats = filter(lambda l: len(l) > 9, netrxtx_stats) | |
proc_netdev_f.close() | |
netdev_txrx_count = {} | |
for line in netrxtx_stats: | |
for iface in STATS_INTERFACES: | |
if iface in line[0]: netdev_txrx_count[iface] = (int(line[1]), int(line[9])) | |
netrxtx_stat = [] | |
for iface in netdev_txrx_count: | |
netrxtx_stat.append("%s: %.2f MB received, %.2f MB transmitted" % (iface, netdev_txrx_count[iface][0]/1048576.0, netdev_txrx_count[iface][1]/1048576.0)) | |
if len(netrxtx_stat) == 0: | |
netrxtx_stat = ["Error processing /proc/net/dev!"] | |
return netrxtx_stat | |
@try_except("Error processing /proc/loadavg!") | |
def fetch_loadavg(): | |
# Process /proc/loadavg | |
proc_loadavg_f = open('/proc/loadavg', 'r') | |
loadavg_stat = ' '.join(proc_loadavg_f.read().split(' ')[0:3]) | |
proc_loadavg_f.close() | |
return loadavg_stat | |
@try_except("Error processing /proc/meminfo!") | |
def fetch_memfree(): | |
# Process /proc/meminfo | |
proc_meminfo_f = open('/proc/meminfo', 'r') | |
memfree_stat = shlex.split(proc_meminfo_f.read()) | |
memfree_stat = "%s %s / %s %s = %.2f%%" % (memfree_stat[4], memfree_stat[5], memfree_stat[1], memfree_stat[2], (float(memfree_stat[4])/float(memfree_stat[1]))*100.0) | |
proc_meminfo_f.close() | |
return memfree_stat | |
@cached(STATS_UPDATE_INTERVAL) | |
def get(): | |
# Fortune for our IPv6 guests | |
fortune_new = fetch_fortune() | |
# IPv4 and Global Scope IPv6 Addresses | |
(ipv4_addresses, ipv6_addresses) = fetch_ip() | |
# Uptime | |
uptime_stat = fetch_uptime() | |
# Network RX/TX counters | |
netrxtx_stat = fetch_netrxtx() | |
# Load Averages | |
loadavg_stat = fetch_loadavg() | |
# Free Memory | |
memfree_stat = fetch_memfree() | |
# Timestamp this update | |
last_updated = time.asctime(time.localtime()) | |
# Format the stats results | |
stats_new = "IPv4 Addresses: " | |
if len(ipv4_addresses) == 0: stats_new += "\n" | |
for i in range(len(ipv4_addresses)): stats_new += " "*(i > 0) + ipv4_addresses[i] + "\n" | |
stats_new += "IPv6 Addresses: " | |
if len(ipv6_addresses) == 0: stats_new += "\n" | |
for i in range(len(ipv6_addresses)): stats_new += " "*(i > 0) + ipv6_addresses[i] + "\n" | |
stats_new += "\n" | |
stats_new += "Machine/OS: " + uname + "\n" | |
stats_new += "Uptime: " + uptime_stat + "\n" | |
stats_new += "Network: " | |
if len(netrxtx_stat) == 0: stats_new += "\n" | |
for i in range(len(netrxtx_stat)): stats_new += " "*(i > 0) + netrxtx_stat[i] + "\n" | |
stats_new += "Load Avg: " + loadavg_stat + "\n" | |
stats_new += "Mem Free: " + memfree_stat + "\n" | |
stats_new += "Last Updated: " + last_updated + "\n" | |
stats_new += "\n" | |
return (stats_new, fortune_new) | |
class Root: | |
RESPONSE_HEAD = "<html><head><title>beaglenmt</title><style type=\"text/css\">body { background: #1f1f1f; } a { text-decoration: none; } a:link, a:visited { color: #e3ceab; } a:hover { color: #8cd0d3; } #main { background: #3f3f3f; color: #dcdcdc; margin: 50px auto; padding: 15px; width: 600px; font-size: 13px; font-family: 'Helvetica Neue', Helvetica, Arial, Geneva, sans-serif; -webkit-border-radius: 15px; -moz-border-radius: 15px; border-radius: 15px; } #main pre { white-space: pre-wrap; margin: 15px 15px 0 15px; font-family: monospace; font-size: 12px; } #main h3 { margin: 0; text-align: center; line-height: 30px; border-bottom: 1px solid #6c6c6c; font-size: 15px; }</style></head><body><div id=\"main\">\n" | |
RESPONSE_FOOT = "Powered by: <a href=\"http://beagleboard.org/bone\">beaglebone</a>, <a href=\"http://buildroot.org/\">buildroot</a>, <a href=\"http://www.cherrypy.org/\">cherrypy</a>, <a href=\"https://github.com/vsergeev/minifortune\">minifortune</a>, <a href=\"https://gist.github.com/2498485\">beaglenmt_httpd.py</a>, <a href=\"http://freedns.afraid.org/\">FreeDNS</a>\n</div></body></html>" | |
def build_response_ipv6(self): | |
(latest_stats, latest_fortune) = get() | |
response = self.RESPONSE_HEAD | |
response += "<h3>Welcome to the <a href=\"http://dev.frozeneskimo.com/embedded_projects/beaglebone_network_multitool_server\">BeagleBone Network Multitool Server</a> via IPv6!<br/></h3>" | |
response += "<pre>\n" + latest_stats + "</pre>\nYou connected via IPv6, you get a fortune!<br/>\n<pre>\n" + latest_fortune + "</pre><br/>\n" | |
response += self.RESPONSE_FOOT | |
return response | |
def build_response_ipv4(self): | |
(latest_stats, latest_fortune) = get() | |
response = self.RESPONSE_HEAD | |
response += "<h3>Welcome to the <a href=\"http://dev.frozeneskimo.com/embedded_projects/beaglebone_network_multitool_server\">BeagleBone Network Multitool Server</a> via IPv4!<br/></h3>" | |
response += "<pre>\n" + latest_stats + "</pre>\nYou connected via IPv4, you get a fortune too!<br/>\n<pre>\n" + latest_fortune + "</pre><br/>\n" | |
response += self.RESPONSE_FOOT | |
return response | |
def index(self): | |
# Check if this was an IPv4-mapped-to-IPv6 address | |
if cherrypy.request.remote.ip[0:6] == "::ffff": | |
return self.build_response_ipv4() | |
return self.build_response_ipv6() | |
index.exposed = True | |
cherrypy.config.update({ | |
'server.socket_host': '::', | |
'server.socket_port': 8080, | |
'server.thread_pool': 20, | |
'server.socket_queue_size': 15, | |
'log.access_file': "/tmp/beaglenmt_httpd_%d_access.log" % os.getpid(), | |
'log.error_file': "/tmp/beaglenmt_httpd_%d_error.log" % os.getpid(), | |
'log.screen': False, | |
'environment': 'production', | |
}) | |
#cherrypy.process.plugins.PIDFile(cherrypy.engine, '/tmp/beaglenmt_httpd.pid').subscribe() | |
#cherrypy.process.plugins.DropPrivileges(cherrypy.engine, umask=0644, uid=99, gid=99).subscribe() | |
cherrypy.quickstart(Root(), '/', config = { '/favicon.ico' : { 'tools.staticfile.on': False }}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment