Skip to content

Instantly share code, notes, and snippets.

@brett-kimball
Last active December 30, 2024 07:41
Show Gist options
  • Save brett-kimball/356b79a33383eab9a855455bdc3174c0 to your computer and use it in GitHub Desktop.
Save brett-kimball/356b79a33383eab9a855455bdc3174c0 to your computer and use it in GitHub Desktop.
Unifi Access API - Batch Enroll Bulk NFC Cards
#!/usr/bin/env python3
import os, sys, requests, json, curses, time, signal, urllib3, subprocess, getopt
from filelock import FileLock, Timeout
""" Usage: batch_enroll_nfc.py [-h|-r "Reader Name"|-t ["NFC Cards"|"NFC Keyfobs"]]
This script opens a card enrollment session with the Unifi Access API on a card reader. As cards are
read, they are added to Unifi Access. Multiple cards can be read in a session, each will be assigned
a sequential numeric "display_id" by Access. The cards can then be assigned to users at a later date.
When all cards have been read, press Ctrl+C to terminate the enrollment session.
Requirements: {
python3: [
requests, # bin/pip install requests
filelock # bin/pip install filelock
]
environment: [
UNIFI_HOST="hostname or ip of Unifi Controller",
UNIFI_TOKEN="token from Access: Settings > Advanced > API Token"
] # set in ~/.profile
}
"""
""" create a usage function
"""
valid_nfc_types = ['NFC Card', 'NFC Keyfob']
def usage():
print(sys.argv[0], "[-h|-r reader|-t type]\n")
print("\t-h, --help\tprint usage Help message")
print("\t-r, --reader\tspecify the name of the card Reader to use")
print(f"\t-t, --type\tspecify the type of NFC devices you are enrolling [{'|'.join(valid_nfc_types)}]")
""" Parse the command line options
"""
try:
opts, args = getopt.getopt(sys.argv[1:], "hr:t:", ["help", "reader=", "type="])
except getopt.GetoptError as err:
# print help information and exit:
print(err) # will print something like "option -a not recognized"
usage()
sys.exit(2)
nfc_type = None
reader_name = None
for o, a in opts:
if o in ("-h", "--help"):
usage()
sys.exit()
elif o in ("-r", "--reader"):
reader_name = a
elif o in ("-t", "--type"):
nfc_type = a
if nfc_type not in valid_nfc_types:
raise ValueError("unsupported NFC type")
else:
assert False, "unhandled option"
""" set a polling rate
"""
rate = 0.5 # seconds
""" define a lockfile
"""
lockfile = '/var/tmp/bulk_enroll_nfc.lock'
""" get unifi API connection parameters from environment (in ~/.profile)
"""
host = os.getenv('UNIFI_HOST')
token = os.getenv('UNIFI_TOKEN')
""" disable InsecureRequestWarning globally, SSL certs are not set up
"""
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
""" set up a signal handler to catch SIGINT and terminate the session
"""
def signal_handler(sig, frame):
cprint(f"\nTerminating Session: {session_id}...", end=" ")
response = delete_session(host, token, session_id=session_id)
cprint(response.get('code'), "2")
sys.exit()
""" define Utility functions
"""
def jprint(data=None, sort_keys=False):
""" define a pretty-print for printing a json object
"""
print(json.dumps(
data,
sort_keys = sort_keys,
indent = 4,
separators = (',', ': ')))
def cprint(text=None, af="4", end=None):
""" define a color print function
"""
# af: 1=red, 2=green, 3=yellow, 4=blue
subprocess.run(["tput", "setaf", af]) # turn on color
print(text, end=end)
subprocess.run(["tput", "sgr0"]) # turn off color
def clear_screen():
""" clear the screen
"""
if sys.stdout.isatty():
curses.setupterm()
e3 = curses.tigetstr('E3') or b''
clear_screen_seq = curses.tigetstr('clear') or b''
os.write(sys.stdout.fileno(), e3 + clear_screen_seq)
def prompt_select(prompt=None, selections=None):
""" select an option from a list
prompt = "text to display"
"""
while True:
cprint(f"\n{prompt} (q to quit)", "3")
counts=[]
for count, element in enumerate(selections, 1):
counts.append(count)
cprint(f"\t{count}: {element}", "3")
selection = input("Enter a number: ")
if selection == "q":
raise SystemExit("Cancelled.")
if selection.isnumeric() and int(selection) in counts:
return selections[int(selection) - 1]
print(f"Invalid selection {selection}, try again.")
time.sleep(1)
""" define Unifi API functions
"""
def fetch_devices(host, token):
""" get all reader devices from the Access API
"""
url = f"{host}/api/v1/developer/devices"
headers = {
"Authorization": f"Bearer {token}",
"accept": "application/json",
"content-type": "application/json"
}
try:
response = requests.get(url, headers=headers, verify=False)
if response.status_code == 200:
return response.json()
else:
return {"error": f"Failed to fetch devices, HTTP Status Code: {response.status_code}", "data": []}
except requests.exceptions.RequestException as e:
raise SystemExit(e)
def fetch_cards(host, token, card_id=None, page_num=1, page_size=1000):
""" get all NFC cards from Access API
"""
url = f"{host}/api/v1/developer/credentials/nfc_cards/tokens"
headers = {
"Authorization": f"Bearer {token}",
"accept": "application/json",
"content-type": "application/json"
}
query_params = {
"page_num": page_num,
"page_size": page_size
}
if card_id:
url = url + '/' + card_id
try:
response = requests.get(url, headers=headers, json=query_params, verify=False)
if response.status_code == 200:
return response.json()
else:
return {"error": f"Failed to fetch cards, HTTP Status Code: {response.status_code}", "data": []}
except requests.exceptions.RequestException as e:
raise SystemExit(e)
def enroll_card(host, token, device_id=None, reset_ua_card=False):
""" enroll a new card with the Access API
"""
url = f"{host}/api/v1/developer/credentials/nfc_cards/sessions"
headers = {
"Authorization": f"Bearer {token}",
"accept": "application/json",
"content-type": "application/json"
}
query_params = {
"device_id": device_id,
"reset_ua_card": reset_ua_card
}
try:
response = requests.post(url, headers=headers, json=query_params, verify=False)
if response.status_code == 200:
return response.json()
else:
return {"error": f"Failed to enroll card, HTTP Status Code: {response.status_code}", "data": {}}
except requests.exceptions.RequestException as e:
raise SystemExit(e)
def fetch_status(host, token, session_id=None):
""" fetch status from the Access API Session
"""
url = f"{host}/api/v1/developer/credentials/nfc_cards/sessions/{session_id}"
headers = {
"Authorization": f"Bearer {token}",
"accept": "application/json",
"content-type": "application/json"
}
try:
response = requests.get(url, headers=headers, verify=False)
if response.status_code == 200:
return response.json()
else:
return {"error": f"Failed to fetch status, HTTP Status Code: {response.status_code}", "data": {}}
except requests.exceptions.RequestException as e:
raise SystemExit(e)
def delete_session(host, token, session_id=None):
""" delete a session from the Access API
"""
url = f"{host}/api/v1/developer/credentials/nfc_cards/sessions/{session_id}"
headers = {
"Authorization": f"Bearer {token}",
"accept": "application/json",
"content-type": "application/json"
}
try:
response = requests.delete(url, headers=headers, verify=False)
if response.status_code == 200:
return response.json()
else:
return {"error": f"Failed to delete session, HTTP Status Code: {response.status_code}", "data": {}}
except requests.exceptions.RequestException as e:
raise SystemExit(e)
# End function defs
""" Begin Script
"""
lock = FileLock(lockfile)
with lock.acquire(0): # init the lockfile
""" clear the screen
"""
clear_screen()
if not nfc_type:
""" if the nfc_type was not specified by command line option, allow the user to select
"""
nfc_type = prompt_select(prompt="Select the type of NFC devices you will be enrolling.", selections=valid_nfc_types)
if not reader_name:
""" if the reader_name was not specified by a command line option, allow the user to select
"""
response = fetch_devices(host, token)
if response.get('error'):
raise SystemExit(response.get('error'))
reader_list = []
for data in response.get('data'):
for device in data:
reader_list.append(device.get('name'))
reader_name = prompt_select(prompt="Select the Card Reader you will use for enrollment.", selections=reader_list)
""" get all existing cards, save their truncated ids in a list
NOTE: truncating limits max card count to 99999, our cards
are numbered sequentially from 0001 to 9999 so nbd
"""
response = fetch_cards(host, token)
if response.get('error'):
raise SystemExit(response.get('error'))
all_card_ids = [int(c.get('display_id')[-5:]) for c in response.get('data')]
if not all_card_ids:
all_card_ids = [0]
# DEBUG:
#raise SystemExit(jprint(response))
""" get the device id of the card enrollment reader
"""
device_id = None
response = fetch_devices(host, token)
if response.get('error'):
raise SystemExit(response.get('error'))
device_data = response.get('data')
# DEBUG:
#raise SystemExit(jprint(response))
clear_screen()
for data in device_data:
for device in data:
if device.get('name') == reader_name:
device_id = device.get('id')
cprint(f"\nReader: {reader_name}, Device ID: {device_id}, Polling Rate: {rate:01} seconds")
if not device_id:
raise SystemExit(f'Reader Device "{reader_name}" not found.\n')
""" create a session to enroll cards, multiple cards can be enrolled in a single session
"""
response = enroll_card(host, token, device_id=device_id)
session_id = response.get('data').get('session_id')
if not session_id:
raise SystemExit(f'No Session ID Returned:\n{response}')
cprint(f'NFC Enrollment Session Initiated, ID: {session_id}\n')
# init the signal handler to catch CTRL+C
signal.signal(signal.SIGINT, signal_handler)
cprint(f'Please scan the cards in sequential order beginning with {max(all_card_ids) + 1 :04}.')
cprint('Press "Ctrl+C" to end the session.\n', "3")
print(f'{"Awaiting:": >12} {max(all_card_ids) + 1 :04}', end=' ')
""" create a throbber to demonstrate the loop is functioning
"""
t = 0
t_chars = list('.oO0Oo.')
old_card_token = None
while True:
""" loop checking the status of the last card scanned, terminate with SIGINT
"""
new_card_token = None
# NOTE: loop itertation rate is 3x the desired fetch rate to make the throbber look cool
# only execute the fetch on every third iteration of the loop
if t % 3 == 0:
response = fetch_status(host, token, session_id=session_id)
if response.get('error'):
print(response.get('error'))
# if the response contains a data field, a card has been read
elif response.get('data'):
new_card_token = response.get('data').get('token')
# if new card data has been returned by the fetch...
if new_card_token and new_card_token != old_card_token:
old_card_token = new_card_token
# fetch more information about the card, particularly its display_id and status
response = fetch_cards(host, token, card_id=new_card_token)
if response.get('error'):
print(response.get('error'))
display_id = response.get('data').get('display_id')
card_status = response.get('data').get('status')
if display_id.isnumeric():
display_id = int(display_id[-5:])
else:
cprint(f'\r{"Error:": >12} Unknown display_id {display_id}{"." <7}', 3)
continue
if display_id in all_card_ids:
cprint(f'\r{"Skipped:": >12} {display_id :04}{", previously enrolled card.": <27}', "1")
else:
all_card_ids.append(display_id)
cprint(f'\r{"Enrolled:": >12} {display_id :04}{".": <27}', "2")
t = 0 # reset throbber
print(f'{"Awaiting:": >12} {max(all_card_ids) + 1 :04}', end=' ')
else:
# update the throbber on screen
print(f'\r{t_chars[t % len(t_chars)]} ', end="")
# sleep between loop iterations
# NOTE: rate is divided by three to increase throbber speed
time.sleep(rate/3)
# increment throbber counter
t += 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment