Last active
June 28, 2022 11:32
-
-
Save dorkmatt/e04a10e3a8c8b60ec4b17babd376f599 to your computer and use it in GitHub Desktop.
Netbox Juniper/Arista optics audit
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# Fetch list of Juniper/Arista devices via Netbox, query optics info via Netconf, write to CSV file | |
# | |
# Written at 34c4 in Leipzig, Germany | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# | |
# Author: Matt Peterson <[email protected]> | |
import sys | |
import os | |
import pwd | |
import json | |
import pprint | |
import csv | |
import re | |
try: | |
from jnpr.junos import Device | |
from lxml import etree | |
from netaddr import * | |
import pynetbox | |
from netmiko import ConnectHandler | |
except ImportError, e: | |
sys.exit("Please install required python modules: junos-eznc lxml netaddr pynetbox netmiko") | |
pp = pprint.PrettyPrinter(indent=4) | |
username = pwd.getpwuid(os.getuid())[0] # assume current user login name | |
def fetch_netbox(brand): | |
try: | |
netbox = pynetbox.api('https://netbox/',token='xyz') | |
except AttributeError: | |
sys.exit("Failed to connect to netbox instance") | |
for host in netbox.dcim.devices.filter(manufacturer=brand,status=1): | |
host = str(host) | |
print "### Netbox: " + host | |
host_data = netbox.dcim.devices.get(name=host) | |
v4_address = host.primary_ip4 | |
if v4_address is not None and brand == "juniper": | |
v4_address = IPNetwork(str(host_data.primary_ip4)).ip | |
fetch_juniper_inventory(host,v4_address,username) | |
elif v4_address is not None and brand == "arista": | |
v4_address = IPNetwork(str(host_data.primary_ip4)).ip | |
fetch_arista_inventory(host,v4_address) | |
else: | |
print "## No IPv4 address found for " + host | |
def fetch_arista_inventory(host,v4_address): | |
arista_options = { | |
'device_type': 'arista_eos', | |
'ip': str(v4_address), | |
'username': 'admin', | |
'key_file': "../access/creds/id_rsa" | |
} | |
print "### Connecting " + username + "@" + host | |
try: | |
arista_device = ConnectHandler(**arista_options) | |
except Exception as e: | |
print e.__doc__ | |
print e.message | |
return False | |
prompt = arista_device.find_prompt().replace("#","") | |
show_ver_json_output = json.loads(arista_device.send_command("show version | json")) | |
print "## Connected to " + prompt + " S/N (" + show_ver_json_output['serialNumber'] + ")" | |
show_interfaces_transceiver = json.loads(arista_device.send_command("show interfaces transceiver | json")) | |
show_inventory = json.loads(arista_device.send_command("show inventory | json")) | |
for key,value in show_interfaces_transceiver["interfaces"].items(): | |
if 'mediaType' in value: | |
wavelength = arista_device.send_command('show interfaces ' + key + ' transceiver hardware | include ^Wavelength') | |
wavelength = wavelength.split(": ")[1] + " nm" | |
interface_numeric_name = re.findall('\d+', key) | |
interface_numeric_name = ''.join(interface_numeric_name) | |
optics_list.append({ | |
"host": host, | |
"port": key, | |
"description": value['mediaType'], | |
"cable_type": value['mediaType'], | |
"sfp-vendor-name": show_inventory['xcvrSlots'][interface_numeric_name]['mfgName'], | |
"sfp-vendor-pno": show_inventory['xcvrSlots'][interface_numeric_name]['modelName'], | |
"part-number": show_inventory['xcvrSlots'][interface_numeric_name]['modelName'], | |
"serial-number": value['vendorSn'], | |
"wavelength": wavelength, | |
}) | |
write_csv_file(fields=[ | |
host, | |
show_inventory['xcvrSlots'][interface_numeric_name]['mfgName'], | |
show_inventory['xcvrSlots'][interface_numeric_name]['modelName'], | |
value['mediaType'], | |
value['vendorSn'], | |
]) | |
arista_device.disconnect() | |
def fetch_juniper_inventory(host,v4_address,username): | |
print "### Connecting " + username + "@" + host | |
junos_device = Device(host=str(v4_address), user=username, timeout=15) # assume ~/.ssh/config with correct key | |
try: | |
junos_device.open() | |
except Exception as e: | |
print e.__doc__ | |
return False | |
print "## Connected to " + junos_device.facts['hostname'] + " S/N (" + junos_device.facts['serialnumber'] + ")" | |
chassis_inventory = junos_device.rpc.get_chassis_inventory() | |
#print chassis_inventory | |
for item in chassis_inventory.findall('chassis/chassis-module/name'): | |
if str(item.text).startswith("FPC"): | |
fpc = item.text.lower() | |
for pic in chassis_inventory.xpath("chassis/chassis-module/chassis-sub-module/name[contains(text(),'PIC')]"): | |
pic_slot = pic.text.replace("PIC ","") | |
fpc_slot = fpc.replace("fpc","") | |
#import pdb;pdb.set_trace() | |
#print "result_pic_detail is " + str(result_pic_detail) | |
#command = "show chassis pic fpc-slot" + fpc_slot + " pic-slot " + pic_slot | |
#print command | |
# { 'description': 'QSFP+-40G-SR4', | |
# 'name': 'Xcvr 0', | |
# 'part-number': 'NON-JNPR', | |
# 'serial-number': '482943AK004W'} | |
result_pic_detail = junos_device.rpc.get_pic_detail(fpc_slot=fpc_slot, pic_slot=pic_slot, normalize=True).findall("fpc/pic-detail/port-information/port//") | |
pic_values = dict([(node.tag, node.text) for node in result_pic_detail]) | |
# { 'cable-type': '100GBASE LR4', | |
# 'fiber-mode': 'SM', | |
# 'port-number': '0', | |
# 'sfp-vendor-fw-ver': '1.0', | |
# 'sfp-vendor-name': 'SumitomoElectric', | |
# 'sfp-vendor-pno': 'SCF1001L4LNJ101', | |
# 'wavelength': '1310 nm'} | |
if pic_values.get('port-number'): | |
xpath_query = 'chassis/chassis-module/name[text()="FPC' + fpc_slot + '"]/parent::*//chassis-sub-module/name[text()="PIC ' + pic_slot + '"]/parent::*/chassis-sub-sub-module/name[text()="Xcvr ' + pic_values.get('port-number') + '"]/parent::chassis-sub-sub-module' | |
result_chassis_inventory = chassis_inventory.xpath(xpath_query) #'chassis/chassis-module/name[text()="FPC 0"]/parent::*//chassis-sub-module/name[text()="PIC 2"]/parent::*/chassis-sub-sub-module/name[text()="Xcvr 0"]/parent::chassis-sub-sub-module') | |
port_values = dict([(node.tag, node.text) for node in result_chassis_inventory[0]]) | |
optics_list.append({ | |
"host": host, | |
"fpc": fpc_slot, | |
"pic": pic_slot, | |
"port": pic_values.get('port-number'), | |
"description": port_values.get('description'), | |
"cable_type": pic_values.get('cable-type').replace(" ","-"), | |
"sfp-vendor-name": pic_values.get('sfp-vendor-name'), | |
"sfp-vendor-pno": pic_values.get('sfp-vendor-pno'), | |
"part-number": port_values.get('part-number'), | |
"serial-number": port_values.get('serial-number'), | |
"wavelength": pic_values.get('wavelength'), | |
}) | |
write_csv_file(fields=[ | |
host, | |
pic_values.get('sfp-vendor-name'), | |
pic_values.get('sfp-vendor-pno'), | |
port_values.get('description'), | |
port_values.get('serial-number'), | |
]) | |
junos_device.close() | |
def write_csv_file(fields): | |
with open(r'./optics_audit.csv', 'a') as file: | |
writer = csv.writer(file, quoting=csv.QUOTE_ALL) | |
writer.writerow(fields) | |
if __name__ == "__main__": | |
optics_list = [] | |
fetch_netbox('arista') | |
fetch_netbox('juniper') | |
pp.pprint(optics_list) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment