Created
November 3, 2018 18:26
-
-
Save f1u77y/9602b8b6fc8768f287c6415bfa74294b to your computer and use it in GitHub Desktop.
nc
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import re | |
from typing import Union, Optional, Callable, List, ByteString | |
from typing.re import Pattern | |
class Colors(object): | |
BLUE = '\033[94m' | |
GREEN = '\033[92m' | |
ENDC = '\033[0m' | |
@classmethod | |
def blue(cls, s: str) -> str: | |
if not s: | |
return s | |
return f'{cls.BLUE}{s}{cls.ENDC}' | |
@classmethod | |
def green(cls, s: str) -> str: | |
if not s: | |
return s | |
return f'{cls.GREEN}{s}{cls.ENDC}' | |
class Netcat(object): | |
def __init__(self, hostname: str, port: int, | |
echo: Optional[List[str]] = None): | |
self._hostname = hostname | |
self._port = port | |
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
self._socket.connect((self._hostname, self._port)) | |
self._lines = [] | |
self._echo = echo or [] | |
def read(self, blocksize: int = 2048) -> str: | |
buf = self._socket.recv(blocksize).decode('utf-8') | |
self.print_recv(buf) | |
return buf | |
def skip_until(self, pred: Union[Callable, str, Pattern]) -> None: | |
def predicate(line): | |
if callable(pred): | |
return pred(line) | |
elif isinstance(pred, str) or isinstance(pred, Pattern): | |
return re.match(pred, line) | |
else: | |
raise ValueError('Unknown type of predicate') | |
while not self._lines or not predicate(self._lines[0]): | |
while self._lines and not predicate(self._lines[0]): | |
self._lines = self._lines[1:] | |
if not self._lines: | |
self._lines = self.read().split('\n') | |
def read_lines(self, limit: Optional[int] = None) -> List[str]: | |
if limit is None: | |
lines = self._lines | |
self._lines = [] | |
return lines | |
else: | |
lines = [] | |
while len(lines) < limit: | |
n = limit - len(lines) | |
if not self._lines: | |
self._lines = self.read().split('\n') | |
lines += self._lines[:n] | |
self._lines = self._lines[n:] | |
return lines | |
def read_until(self, end='\n'): | |
s = '' | |
c = '' | |
while c != end: | |
s += c | |
c = self._socket.recv(1).decode() | |
self.print_recv(s + end) | |
return s | |
def read_line(self) -> str: | |
return self.read_until('\n') | |
# return self.read_lines(1)[0] | |
def write(self, line: Union[int, str, ByteString]) -> None: | |
if isinstance(line, int): | |
line = str(line) | |
if isinstance(line, ByteString): | |
echo = repr(line) | |
else: | |
echo = line + '\n' | |
self.print_write(echo) | |
if isinstance(line, str): | |
line = (line + '\n').encode('utf-8') | |
line += b'\n' | |
self._socket.send(line) | |
def echo(self, echo: bool) -> None: | |
self._echo = echo | |
def print_modified(self, s: str, modify: Callable) -> None: | |
for file_name in self._echo: | |
if file_name == '-': | |
kwargs = dict() | |
s = modify(s) | |
else: | |
kwargs = {'file': open(file_name, 'a')} | |
print(Colors.blue(s), end='', **kwargs) | |
if 'file' in kwargs: | |
kwargs['file'].close() | |
def print_recv(self, s: str) -> None: | |
self.print_modified(s, Colors.blue) | |
def print_write(self, s) -> None: | |
self.print_modified(s, Colors.green) | |
__all__ = ['Netcat', 'Colors'] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment