|
#!/usr/bin/python3 |
|
|
|
# sudo -H pip3 install openpyxl pillow pysnmp tqdm |
|
# Get onu list |
|
|
|
__version__ = '0.3' |
|
__copyright__ = "Vladimir Kushnir aka Kvantum i(c)2020" |
|
|
|
import csv |
|
import os |
|
from datetime import datetime, timedelta |
|
from optparse import OptionParser, OptionGroup |
|
|
|
import pysnmp.smi |
|
from openpyxl import Workbook, load_workbook |
|
from openpyxl.styles import Alignment |
|
from openpyxl.utils.cell import get_column_letter |
|
from openpyxl.worksheet.table import Table, TableStyleInfo |
|
from pysnmp.hlapi import * |
|
from pysnmp.proto.rfc1905 import endOfMibView |
|
|
|
from tqdm import tqdm |
|
|
|
# init pysnmp |
|
snmpEngine = SnmpEngine() |
|
contextData = ContextData() |
|
mibBuilder = pysnmp.smi.builder.MibBuilder() |
|
mibViewController = pysnmp.smi.view.MibViewController(mibBuilder) |
|
pysnmp.smi.compiler.addMibCompiler(mibBuilder) |
|
|
|
|
|
def get_options(arguments=None): |
|
"""Parse Command-Line parameters""" |
|
parser = OptionParser(usage="%prog [options] <Device>", |
|
version="%prog " + __version__) |
|
parser.add_option('--vendor', dest='vendor', |
|
choices=['C-DATA', 'VSOL'], |
|
help='PON Vendor') |
|
parser.add_option('--oids', dest='oids', |
|
help='CSV file with list oids and its descriptions to scan') |
|
parser.add_option('--excel', dest='excel', default=datetime.now().strftime("%d-%m-%Y.xlsx"), |
|
help='Excel file to save results') |
|
parser.add_option('--sheet', dest='sheet', default=datetime.now().strftime("%d-%m-%Y"), |
|
help='Excel sheet to save results') |
|
|
|
group = OptionGroup(parser, "SNMP") |
|
group.add_option('--snmp_version', dest='snmp_version', choices=['1', '2c', '3'], default='2c', |
|
help="specifies SNMP version to use") |
|
group.add_option('--snmp_community', dest='snmp_community', default='public', |
|
help='set the community string') |
|
parser.add_option_group(group) |
|
|
|
(opt, args) = parser.parse_args(arguments) |
|
|
|
if opt.snmp_version == '3': |
|
parser.error("SNMP version 3 not supported yet !") |
|
if len(args) < 1: |
|
parser.error("You must specify device IP !") |
|
|
|
return args[0], opt |
|
|
|
|
|
def load_csv(file_name): |
|
data = [] |
|
with open(file_name, newline='') as cf: |
|
reader = csv.DictReader(cf) |
|
for row in reader: |
|
data.append(row) |
|
return data |
|
|
|
|
|
def get_indexes(scan_oid, opt): |
|
indexes = [] |
|
for (errorIndication, errorStatus, errorIndex, varBinds) in \ |
|
bulkCmd(snmpEngine, |
|
opt.auth_data, |
|
opt.transport_target, |
|
contextData, |
|
1, 50, |
|
ObjectType(ObjectIdentity(scan_oid)), lexicographicMode=False): |
|
if errorIndication: |
|
print(errorIndication) |
|
else: |
|
if errorStatus: |
|
print('%s at %s\n' % ( |
|
errorStatus.prettyPrint(), |
|
errorIndex and varBinds[-1][int(errorIndex) - 1] or '?')) |
|
else: |
|
for varBind in varBinds: |
|
scan_oid, value = varBind |
|
if value == endOfMibView: |
|
break |
|
else: |
|
indexes.append(int(value)) |
|
return indexes |
|
|
|
|
|
def get_data(indexes, data, worksheet, col, opt): |
|
for (errorIndication, errorStatus, errorIndex, varBinds) in \ |
|
bulkCmd(snmpEngine, |
|
opt.auth_data, |
|
opt.transport_target, |
|
contextData, |
|
1, 50, |
|
ObjectType(ObjectIdentity(data['oid'])), lexicographicMode=False): |
|
if errorIndication: |
|
print(errorIndication) |
|
else: |
|
if errorStatus: |
|
print('%s at %s\n' % ( |
|
errorStatus.prettyPrint(), |
|
errorIndex and varBinds[-1][int(errorIndex) - 1] or '?')) |
|
else: |
|
for varBind in varBinds: |
|
oid, value = varBind |
|
|
|
if value == endOfMibView: |
|
break |
|
# fix china hardware |
|
for i in range(-1, -5, -1): |
|
sidx = int(oid[i]) |
|
if sidx > 0: |
|
break |
|
# unpack value |
|
try: |
|
row = indexes.index(sidx) + 2 |
|
except ValueError: |
|
indexes.append(sidx) |
|
row = indexes.index(sidx) + 2 |
|
if data['type'] == 'str': |
|
value = str(value) |
|
worksheet.cell(column=col, row=row, value=value) |
|
elif data['type'] == 'int': |
|
if data['multiplier']: |
|
value = int(value) * int(data['multiplier']) |
|
else: |
|
value = int(value) |
|
worksheet.cell(column=col, row=row, value=value) |
|
if data['format']: |
|
worksheet.cell(column=col, row=row).number_format = data['format'] |
|
elif data['type'] == 'float': |
|
if data['multiplier']: |
|
value = float(value) * float(data['multiplier']) |
|
else: |
|
value = float(value) |
|
worksheet.cell(column=col, row=row, value=value) |
|
if data['format']: |
|
worksheet.cell(column=col, row=row).number_format = data['format'] |
|
elif data['type'] == 'time': |
|
if data['multiplier']: |
|
value = timedelta(seconds=(int(value) * data['multiplier'])) |
|
else: |
|
value = timedelta(seconds=int(value)) |
|
worksheet.cell(column=col, row=row, value=value) |
|
elif data['type'] == 'hex': |
|
value = ''.join(['%0.2x' % x for x in value.asNumbers()]) |
|
worksheet.cell(column=col, row=row, value=value) |
|
elif data['type'] == 'mac': |
|
value = data['format'].join(['%0.2x' % x for x in value.asNumbers()]).upper() |
|
worksheet.cell(column=col, row=row, value=value) |
|
elif data['type'] == 'dict': |
|
value = eval(data['format'])[value] |
|
worksheet.cell(column=col, row=row, value=value) |
|
else: |
|
worksheet.cell(column=col, row=row, value=value) |
|
if worksheet.cell(column=1, row=row).value is None: |
|
worksheet.cell(column=1, row=row, value=sidx) |
|
|
|
|
|
if __name__ == '__main__': |
|
host, options = get_options() |
|
if options.snmp_version == '1': |
|
options.auth_data = CommunityData(options.snmp_community, mpModel=0) |
|
elif options.snmp_version.lower() == '2c': |
|
options.auth_data = CommunityData(options.snmp_community, mpModel=1) |
|
try: |
|
ip, port = host.split(':') |
|
except ValueError: |
|
ip = host |
|
port = 161 |
|
options.transport_target = UdpTransportTarget((ip, port), timeout=2, retries=5) |
|
|
|
if os.path.isfile(options.excel): |
|
wb = load_workbook(options.excel) |
|
else: |
|
wb = Workbook() |
|
|
|
oids = load_csv(options.oids) |
|
oid_idx = oids.pop(0) |
|
ws = wb.create_sheet(options.sheet) |
|
column = 2 |
|
ws.cell(row=1, column=1, value='#').alignment = Alignment(horizontal='center', vertical='center') |
|
|
|
print("\nLoading ONU snmp indexes ...") |
|
idxs = get_indexes(oid_idx['oid'], options) |
|
print("Found {} indexes!\n".format(len(idxs))) |
|
|
|
print("Loading snmp values ...") |
|
for oid in tqdm(oids): |
|
ws.cell(row=1, column=column, value=oid['name']).alignment = \ |
|
Alignment(horizontal='center', vertical='center') |
|
get_data(idxs, oid, ws, column, options) |
|
column += 1 |
|
print("All job done!") |
|
|
|
# :TODO Add conditional formatting support |
|
|
|
table_ref = "A1:{}{}".format(get_column_letter(column - 1), len(idxs) + 1) |
|
table_style = TableStyleInfo(name='TableStyleMedium2', showRowStripes=True, showFirstColumn=True) |
|
table = Table(displayName='Table', ref=table_ref, tableStyleInfo=table_style) |
|
ws.add_table(table) |
|
wb.save(options.excel) |