Last active
June 14, 2023 14:56
-
-
Save mvanderlee/c6fb0bf7611565e43813bfa503f2afc1 to your computer and use it in GitHub Desktop.
Paramiko's interactive demo with support for control characters in posix.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import contextlib | |
import questionary | |
import os | |
from interactive_ssh import interactive_shell | |
class ConfirmAddPolicy(paramiko.client.MissingHostKeyPolicy): | |
""" | |
Policy for confirming the user if they want to add the hostname and new host key to the | |
local `.HostKeys` object before saving it. This is used by `.SSHClient`. | |
""" | |
def missing_host_key(self, client, hostname, key): | |
logger.warning( | |
"Unknown {} host key for {}: {}".format( | |
key.get_name(), hostname, hexlify(key.get_fingerprint()) | |
) | |
) | |
should_add = questionary.confirm("Continue and add host key?").unsafe_ask() | |
if should_add: | |
client._host_keys.add(hostname, key.get_name(), key) | |
if client._host_keys_filename is not None: | |
client.save_host_keys(client._host_keys_filename) | |
logger.info('Added host key') | |
else: | |
logger.warning('Failed to add host key. No host key file defined!') | |
else: | |
raise paramiko.SSHException( | |
"Server {!r} not found in known_hosts".format(hostname) | |
) | |
@dataclass | |
class SSHShell: | |
'''Interactive SSHShell that can be used as a contextmanager''' | |
hostname: str | |
username: str | |
key_filename: str = None | |
def connect(self): | |
self._open() | |
self._launch() | |
self._close() | |
def _open(self): | |
# Get terminal size - https://stackoverflow.com/a/943921 | |
rows, columns = os.popen('stty size', 'r').read().split() | |
ssh_client = paramiko.SSHClient() | |
# Set hosts key path so we can save to it | |
ssh_client.load_host_keys(os.path.expanduser('~/.ssh/known_hosts')) | |
ssh_client.set_missing_host_key_policy(ConfirmAddPolicy()) | |
ssh_client.connect(hostname=self.hostname, username=self.username, key_filename=self.key_filename) | |
channel = ssh_client.get_transport().open_session() | |
channel.get_pty(term=os.getenv('TERM', 'xterm-256color'), width=int(columns), height=int(rows)) | |
channel.invoke_shell() | |
self._ssh_client = ssh_client | |
self._channel = channel | |
return channel | |
def _launch(self): | |
interactive_shell(self._channel) | |
def _close(self): | |
self._ssh_client.close() | |
def __enter__(self): | |
return self._open() | |
def __exit__(self, exception_type, exception_value, traceback): | |
self._launch() | |
self._close() | |
# Define ssh args | |
ip = '127.0.0.1' | |
username = 'ubuntu' | |
key_file = '~/.ssh/id_rsa' | |
# Simply open the interactive shell | |
SSHShell(hostname=ip, username=user, key_filename=key_file).connect() | |
# Open the interactive shell and send some commands before giving control to the user | |
with SSHShell(hostname=ip, username=user, key_filename=key_file) as shell: | |
shell.send('cd /app') | |
shell.send('docker logs my-container --since 5m') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Source: https://github.com/paramiko/paramiko/blob/main/demos/interactive.py | |
# Copyright (C) 2003-2007 Robey Pointer <[email protected]> | |
# | |
# This file is part of paramiko. | |
# | |
# Paramiko is free software; you can redistribute it and/or modify it under the | |
# terms of the GNU Lesser General Public License as published by the Free | |
# Software Foundation; either version 2.1 of the License, or (at your option) | |
# any later version. | |
# | |
# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY | |
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR | |
# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more | |
# details. | |
# | |
# You should have received a copy of the GNU Lesser General Public License | |
# along with Paramiko; if not, write to the Free Software Foundation, Inc., | |
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | |
import socket | |
import sys | |
import paramiko | |
# https://invisible-island.net/xterm/ctlseqs/ctlseqs.html#h2-Bracketed-Paste-Mode | |
START_PASTE = "\x1B\x5B\x32\x30\x30\x7E" # ESC[200~ | |
END_PASTE = "\x1B\x5B\x32\x30\x31\x7E" # ESC[201~ | |
# windows does not have termios... | |
try: | |
import termios | |
import tty | |
has_termios = True | |
except ImportError: | |
has_termios = False | |
def is_int(val: str) -> bool: | |
try: | |
int(val) | |
return True | |
except Exception: | |
return False | |
def interactive_shell(chan: paramiko.Channel): | |
if has_termios: | |
posix_shell(chan) | |
else: | |
windows_shell(chan) | |
def posix_readkey() -> str: | |
"""Get a keypress. If an escaped key is pressed, the full sequence is | |
read and returned. | |
Copied from readchar: | |
https://github.com/magmax/python-readchar/blob/master/readchar/_posix_read.py#L30 | |
""" | |
c1 = sys.stdin.read(1) | |
if c1 != "\x1B": # ESC | |
return c1 | |
c2 = sys.stdin.read(1) | |
if c2 not in "\x4F\x5B": # O[ | |
return c1 + c2 | |
c3 = sys.stdin.read(1) | |
if c3 not in "\x31\x32\x33\x35\x36": # 12356 | |
return c1 + c2 + c3 | |
c4 = sys.stdin.read(1) | |
if c4 not in "\x30\x31\x33\x34\x35\x37\x38\x39": # 01345789 | |
return c1 + c2 + c3 + c4 | |
c5 = sys.stdin.read(1) | |
key = c1 + c2 + c3 + c4 + c5 | |
# Bracketed Paste Mode: # https://invisible-island.net/xterm/ctlseqs/ctlseqs.html#h2-Bracketed-Paste-Mode | |
if key == START_PASTE[:-1] or key == END_PASTE[:-1]: | |
c6 = sys.stdin.read(1) | |
return key + c6 | |
return key | |
def windows_readkey() -> str: | |
"""Reads the next keypress. If an escaped key is pressed, the full | |
sequence is read and returned. | |
Copied from readchar: | |
https://github.com/magmax/python-readchar/blob/master/readchar/_win_read.py#LL14C1-L30C24 | |
""" | |
ch = sys.stdin.read(1) | |
# if it is a normal character: | |
if ch not in "\x00\xe0": | |
return ch | |
# if it is a scpeal key, read second half: | |
ch2 = sys.stdin.read(1) | |
return "\x00" + ch2 | |
def posix_shell(chan: paramiko.Channel): # noqa: C901 | |
import select | |
oldtty = termios.tcgetattr(sys.stdin) | |
# input_history = [] | |
try: | |
tty.setraw(sys.stdin.fileno()) | |
tty.setcbreak(sys.stdin.fileno()) | |
chan.settimeout(0.0) | |
while True: | |
r, w, e = select.select([chan, sys.stdin], [], []) | |
if chan in r: | |
try: | |
x = chan.recv(1024).decode() | |
if len(x) == 0: | |
sys.stdout.write("\r\n") | |
break | |
sys.stdout.write(x) | |
sys.stdout.flush() | |
except socket.timeout: | |
pass | |
if sys.stdin in r: | |
key = posix_readkey() | |
# When pasting something, we need to read the entire pasted blob at once | |
# Otherwise it'll hang until the next key press. | |
# This has to do with how 'select.select' detects changes. | |
# A paste is a single event of many characters, so we must handle them all as one event | |
if key == START_PASTE: | |
# Start reading the pasted text | |
key = posix_readkey() | |
# Until we reach the end of the pasted text | |
while key != END_PASTE: | |
chan.send(key) | |
# input_history.append(key) | |
key = posix_readkey() | |
# We've exhausted the paste event, wait for next event | |
continue | |
if len(key) == 0: | |
break | |
chan.send(key) | |
# input_history.append(key) | |
finally: | |
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty) | |
# Useful in debugging how control characters were send | |
# print(input_history) | |
# thanks to Mike Looijmans for this code | |
def windows_shell(chan: paramiko.Channel): | |
import threading | |
sys.stdout.write( | |
"Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n" | |
) | |
def writeall(sock): | |
while True: | |
data = sock.recv(256) | |
if not data: | |
sys.stdout.write("\r\n*** EOF ***\r\n\r\n") | |
sys.stdout.flush() | |
break | |
sys.stdout.write(data) | |
sys.stdout.flush() | |
writer = threading.Thread(target=writeall, args=(chan,)) | |
writer.start() | |
try: | |
while True: | |
d = windows_readkey() | |
if not d: | |
break | |
chan.send(d) | |
except EOFError: | |
# user hit ^Z or F6 | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Sourced from: https://github.com/paramiko/paramiko/blob/main/demos/interactive.py
I've only added control character support for posix.