Created
March 13, 2023 02:48
-
-
Save chrismeyersfsu/1514143205356e2f2255ae83bbb8c9c6 to your computer and use it in GitHub Desktop.
Vpn-creds
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/local/bin/python3.8 | |
import socket | |
import select | |
import fcntl | |
import os | |
from collections.abc import Generator | |
import sys | |
import pyotp | |
import time | |
READ_SIZE = 1024 | |
USERNAME = 'vpn_username_here' | |
TOTP_TOKEN = 'your_token_secret_here' | |
SELECT_TIMEOUT = 1000 | |
DEBUG_LEVELS = { | |
'info': 1, | |
'debug': 2, | |
} | |
DEBUG_LEVEL = DEBUG_LEVELS['info'] | |
def printd(*args, **kwargs): | |
if DEBUG_LEVEL >= DEBUG_LEVELS['debug']: | |
print(*args, **kwargs) | |
def printi(*args, **kwargs): | |
if DEBUG_LEVEL >= DEBUG_LEVELS['info']: | |
print(*args, **kwargs) | |
""" | |
Connect to openvpn client management socket, read messages until it finds an | |
auth request; reply with username and generated totp token. | |
Disconnect if nothing interesting happens and reconnect | |
The constant disconnect/reconnect is to work around the fact that pfSense + openvpn | |
only supports 1 user on the management socket at a time. The pfSense system gets the | |
status of the connection via this socket. Our disconnect/connect looping allows for | |
the status to take a turn. | |
""" | |
class messages(Generator): | |
def __init__(self, sock, timeout): | |
self.sock = sock | |
self.timeout = timeout | |
self.data = b'' | |
self.msgs = [] | |
def read_msg(self): | |
inputs = [self.sock] | |
#readable, writable, exceptional = select.select(inputs, [], inputs, 2.0) | |
fd_to_sock = { self.sock.fileno(): self.sock } | |
poller = select.poll() | |
READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR | |
poller.register(self.sock, READ_ONLY) | |
events = poller.poll(self.timeout) | |
if not events: | |
printd("Timed out on select", file=sys.stderr) | |
raise RuntimeError(f"Timed out on select") | |
for fd, flag in events: | |
sock = fd_to_sock[fd] | |
if flag & (select.POLLIN | select.POLLPRI): | |
if sock is sock: | |
try: | |
res = sock.recv(READ_SIZE) | |
except socket.error as e: | |
err = e.args[0] | |
raise RuntimeError(f"sock recv() returned None, likely closed {err}") | |
if res == b'': | |
sock.close() | |
raise RuntimeError("sock recv() returned None, likely closed") | |
self.data += res | |
elif flag & (select.POLLHUP | select.POLLERR): | |
sock.close() | |
raise RuntimeError("socket failed during read") | |
def process_msgs(self): | |
while True: | |
try: | |
index = self.data.index(b'\n') | |
except ValueError: | |
break | |
self.msgs.append(self.data[0:index]) | |
self.data = self.data[index+1:] | |
def send(self, ignore): | |
while True: | |
if self.msgs: | |
return self.msgs.pop().decode() | |
try: | |
self.read_msg() | |
except RuntimeError as e: | |
printd(f"Exception in send() {e}", file=sys.stderr) | |
self.throw(f"Socket error {e}") | |
self.process_msgs() | |
def throw(self, type=None, value=None, traceback=None): | |
raise StopIteration | |
def run(openvpn_ctrl_path='/var/etc/openvpn/client1/sock'): | |
while True: | |
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
try: | |
sock.connect(openvpn_ctrl_path) | |
except socket.error as msg: | |
prindt(f"Failed to connect to {openvpn_ctrl_path}: {msg}", file=sys.stderr) | |
time.sleep(3) | |
continue | |
printd("Connected, entering msg loop", file=sys.stderr) | |
sock.setblocking(0) | |
for msg in messages(sock, SELECT_TIMEOUT): | |
printd(f"Message: {msg}") | |
if msg.startswith(">PASSWORD:Need 'Auth' username/password"): | |
printi(f"Sending asked for credentials") | |
totp = pyotp.TOTP(TOTP_TOKEN) | |
token = totp.now() | |
need_type = 'Auth' | |
reply_suffix = '\n' | |
reply_user = f'username "{need_type}" {USERNAME}{reply_suffix}' | |
reply_pass = f'password "{need_type}" {token}{reply_suffix}' | |
sock.sendall(f'{reply_user}{reply_pass}'.encode()) | |
printd("Disconnected", file=sys.stderr) | |
# Don't reconnect. This works well with cron running this in a loop | |
break | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment