Skip to content

Instantly share code, notes, and snippets.

@jathanism
Last active November 15, 2019 13:04
Show Gist options
  • Save jathanism/5a892f3eb65cddcca7bd to your computer and use it in GitHub Desktop.
Save jathanism/5a892f3eb65cddcca7bd to your computer and use it in GitHub Desktop.
Run 4 reachability tests on network devices and return a list of results for each.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
"""
reachable.py - Run reachability tests on Devices.
The following tests are run:
- DNS ok
- PTR ok
- ping ok
- SSH ok
End result is a list of ``TestResult`` objects, one for each hostname, with an
``.ok`` attribute.
If all tests passed, the device is 'ok', otherwise it's not and needs to be
checked further.
Example::
$ python -i reachable.py /tmp/devices.txt
Testing devices [########################################] 13/13 100%
Elapsed time: 2.207259 seconds
>>> ok = [r for r in results if r.ok]
>>> bad = [r for r in results if not r.ok]
>>> ok[0]
<TestResult: foo3a-md01, ok: True>
>>> ok[0].to_dict()
{u'dns_ok': True,
u'fqdn': u'foo3a-md01.net.fake.notreal',
u'hostname': 'foo3a-md01',
u'ok': True,
u'ping_ok': True,
u'ptr_ok': True,
u'ssh_ok': True}
>>> bad[0]
<TestResult: foo1-br01, ok: False>
>>> bad[0].to_dict()
{u'dns_ok': True,
u'fqdn': u'foo1-br01.net.fake.notreal',
u'hostname': 'foo1-br01',
u'ok': False,
u'ping_ok': False,
u'ptr_ok': True,
u'ssh_ok': False}
"""
__author__ = 'Jathan McCollum'
__maintainer__ = 'Jathan McCollum'
__email__ = '[email protected]'
__copyright__ = 'Copyright (c) 2015 Dropbox, Inc.'
__version__ = '0.3.1'
# Requirements:
"""
click
ipaddress
trigger
Twisted
"""
import ipaddress
import re
import sys
import time
import click
from trigger.utils import network
from twisted.internet import task, defer, threads
from twisted.names import client, error
from twisted.python import log
# Domain to append to record lookups.
DOMAIN = '.net.fake.notreal'
# How many async tasks and/or threads to run at once.
NUM_JOBS = 40
def reverse_pointer(address):
"""Return a reverse pointer for an ``address."""
return ipaddress.ip_address(unicode(address)).reverse_pointer
class TestResult(object):
"""Contains test result values for a Device."""
dns_ok = False
address = None
ptr_ok = False
ptr_hostname = None
ping_ok = False
ssh_ok = False
def __init__(self, hostname):
self.hostname = hostname
self.fqdn = hostname + DOMAIN
@property
def ok(self):
return all([self.dns_ok, self.ptr_ok, self.ping_ok, self.ssh_ok])
def __repr__(self):
return '<TestResult: %s, ok: %s>' % (self.hostname, self.ok)
def __str__(self):
return self.hostname
def __eq__(self, other):
return self.hostname.__eq__(TestResult(str(other)).hostname)
def __hash__(self):
return hash(self.hostname)
def to_dict(self):
return {
'hostname': self.hostname,
'fqdn': self.fqdn,
'ok': self.ok,
'dns_ok': self.dns_ok,
'ptr_ok': self.ptr_ok,
'ping_ok': self.ping_ok,
'ssh_ok': self.ssh_ok,
}
@defer.inlineCallbacks
def check_address(hostname):
"""
Validate forward DNS record.
:param hostname:
Device hostname
"""
result = TestResult(hostname)
msg = '[%s] start' % result.hostname
log.msg(msg)
# Try to resolve the hostname.
try:
address = yield client.getHostByName(result.fqdn)
except error.DNSNameError:
# log.err('[%s] ERROR: No IP address found' % result.hostname)
address = ''
# Validate and store the address
if address:
result.address = address
result.dns_ok = True
defer.returnValue(result)
@defer.inlineCallbacks
def check_reverse(result):
"""
Validate reverse DNS PTR.
:param result:
TestResult object
"""
if result.address is None:
# log.err('[%s] skipping ptr lookup.' % result.hostname)
defer.returnValue(result)
ptr = reverse_pointer(result.address)
log.msg('[%s] pointer: %s' % (result.hostname, ptr))
try:
ptr_info = yield client.lookupPointer(ptr)
except error.DNSNameError:
# log.err('[%s] PTR lookup failed' % result.hostname)
defer.returnValue(result)
else:
answers, authority, additional = ptr_info
# If we got an answer, check if it matches the forward
if answers:
a = answers[0]
# Get the hostname and store it on the TestResult
reverse_hostname = a.payload.name.name
result.ptr_hostname = reverse_hostname
# Validate that forward matches reverse
result_matches = result.fqdn == reverse_hostname
log.msg(
'[%s] reverse matches forward? %s' % (
result.hostname, result_matches
)
)
if result_matches:
result.ptr_ok = True
else:
print "REVERSE LOOKUP GONE HORRIBLY WRONG"
defer.returnValue(result)
@defer.inlineCallbacks
def check_ping(result):
"""
Validate reachability on icmp echo.
:param result:
TestResult object
"""
if not result.dns_ok:
# log.err('[%s] skipping ping.' % result.hostname)
defer.returnValue(result)
# Run ping in thread
ok = yield threads.deferToThread(network.ping, result.fqdn, timeout=1)
if ok:
result.ping_ok = True
else:
# log.err('[%s] ping failed.' % result.hostname)
pass
defer.returnValue(result)
@defer.inlineCallbacks
def check_ssh(result):
"""
Validate reachability on 22/tcp.
:param result:
TestResult object
"""
if not result.dns_ok:
# log.err('[%s] skipping ssh.' % result.hostname)
defer.returnValue(result)
# Run SSH in thread
ok = yield threads.deferToThread(network.test_ssh, result.fqdn, timeout=1)
if ok:
result.ssh_ok = True
else:
# log.err('[%s] ssh failed.' % result.hostname)
pass
defer.returnValue(result)
def test_done(result, results, bar):
"""
Finalize result and update progress bar.
:param result:
TestResult object
:param results:
List used to store results
:param bar:
Progressbar object
"""
results.append(result)
msg = '[%s] finish' % result.hostname
log.msg(msg)
bar.update(1)
def all_done(_, start_time):
"""Display elapsed time."""
end_time = time.time() # End timer
print 'Elapsed time: %f seconds' % (end_time - start_time)
def main(reactor, hostnames, results):
"""
Do the thing.
:param reactor:
Twisted reactor object
:param hostnames:
List of hostnames
:param results:
List used to store results
"""
start_time = time.time() # Start timer
# Track our deferreds
deferreds = []
# Only run NUM_JOBS at once
sem = defer.DeferredSemaphore(NUM_JOBS)
num_items = len(hostnames)
bar_label = 'Testing devices'
with click.progressbar(
hostnames, length=num_items, label=bar_label,
show_pos=True, width=0, empty_char='-', show_percent=True) as bar:
for hostname in hostnames:
# The bar will be updated at the end
d = sem.run(run_tests, hostname, results, bar)
deferreds.append(d)
dl = defer.DeferredList(deferreds)
dl.addCallback(all_done, start_time)
return dl
def run_tests(hostname, results, bar):
"""
Run the device tests asynchronously.
:param hostname:
Device hostname
:param results:
List used to store results
:param bar:
Progressbar object
"""
# Check the address
d = check_address(hostname)
# Check the reverse pointer
d.addCallback(check_reverse)
# Check ping
d.addCallback(check_ping)
# Check SSH
d.addCallback(check_ssh)
# Store result
# d.addCallback(test_done, bar)
d.addCallback(test_done, results, bar)
return d
def filter_results(results):
"""
Filter results based on rules.
:param results:
List of TestResult objects
"""
def should_stay(result):
"""If any of the rules match, it doesn't stay."""
rules = [
# Exclude "role=ma +role=ca"
re.search(r'-[cm]a0[12]$', result.hostname),
]
return not any(rules)
return [r for r in results if should_stay(r)]
if __name__ == '__main__':
try:
hostfile = sys.argv[1]
except IndexError:
sys.exit(
'Run reachability tests on devices.\n\n'
'usage: %s <hostfile>' % sys.argv[0]
)
with open(hostfile, 'rb') as fh:
hostnames = fh.read().splitlines()
hostnames = [h for h in hostnames if h]
# Where our results are stored.
results = []
# Adjust threadpool to match deferred pool.
from twisted.internet import reactor
reactor.suggestThreadPoolSize(NUM_JOBS)
# Do the damn thing.
task.react(main, [hostnames, results], reactor)
bad = [r for r in results if not r.ok]
ok = [r for r in results if r.ok]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment