Last active
August 11, 2020 23:12
-
-
Save carlosperate/1dfcdc9823646e5983b92419ea13bdc1 to your computer and use it in GitHub Desktop.
Mu Serial REPL benchmark
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from original import FakePaneOriginal | |
from option_1 import FakePaneOne | |
from option_2 import FakePaneTwo | |
from option_3 import FakePaneThree | |
from option_4 import FakePaneFour | |
from option_5 import FakePaneFive | |
utf8_char_list = [ | |
"°", | |
"🌈", | |
"𩸽", | |
"슰", | |
"jusll" | |
"ё", | |
"𠎝", | |
] * 12 | |
vt100_cmd_list = [ | |
"\r", # Return carriage, ignored | |
"\b", # Backspace, moves left | |
"\x1B[C", # VT100 Right | |
] * 18 | |
bytes_per_loop = 20 | |
utf8_str = "".join(utf8_char_list) | |
encoded_data = utf8_str.encode('utf-8') + "".join(vt100_cmd_list).encode('utf-8') | |
def test_0(): | |
p = FakePaneOriginal() | |
for i in range(0, len(encoded_data), bytes_per_loop): | |
p.process_bytes(encoded_data[i:i + bytes_per_loop]) | |
assert p.print_output != utf8_str | |
#print('test 0:\n\toriginal: {}\n\toutput : {}\n\tPass: {}\n'.format(utf8_str, p.print_output, p.print_output == utf8_str)) | |
def test_1(): | |
p = FakePaneOne() | |
for i in range(0, len(encoded_data), bytes_per_loop): | |
p.process_bytes(encoded_data[i:i + bytes_per_loop]) | |
assert p.print_output == utf8_str | |
#print('test 1:\n\toriginal: {}\n\toutput : {}\n\tPass: {}\n'.format(utf8_str, p.print_output, p.print_output == utf8_str)) | |
def test_2(): | |
p = FakePaneTwo() | |
for i in range(0, len(encoded_data), bytes_per_loop): | |
p.process_bytes(encoded_data[i:i + bytes_per_loop]) | |
assert p.print_output == utf8_str | |
#print('test 2:\n\toriginal: {}\n\toutput : {}\n\tPass: {}\n'.format(utf8_str, p.print_output, p.print_output == utf8_str)) | |
def test_3(): | |
p = FakePaneThree() | |
for i in range(0, len(encoded_data), bytes_per_loop): | |
p.process_bytes(encoded_data[i:i + bytes_per_loop]) | |
assert p.print_output == utf8_str | |
#print('test 3:\n\toriginal: {}\n\toutput : {}\n\tPass: {}\n'.format(utf8_str, p.print_output, p.print_output == utf8_str)) | |
def test_4(): | |
p = FakePaneFour() | |
for i in range(0, len(encoded_data), bytes_per_loop): | |
p.process_bytes(encoded_data[i:i + bytes_per_loop]) | |
assert p.print_output == utf8_str | |
#print('test 4:\n\toriginal: {}\n\toutput : {}\n\tPass: {}\n'.format(utf8_str, p.print_output, p.print_output == utf8_str)) | |
def test_5(): | |
p = FakePaneFive() | |
for i in range(0, len(encoded_data), bytes_per_loop): | |
p.process_bytes(encoded_data[i:i + bytes_per_loop]) | |
assert p.print_output == utf8_str | |
#print('test 4:\n\toriginal: {}\n\toutput : {}\n\tPass: {}\n'.format(utf8_str, p.print_output, p.print_output == utf8_str)) | |
if __name__ == '__main__': | |
import timeit | |
setup = ("from __main__ import test_0, test_1, test_2, test_3, test_4, test_5\n") | |
original = timeit.timeit("test_0()", setup=setup, number=10000) | |
option1 = timeit.timeit("test_1()", setup=setup, number=10000) | |
option2 = timeit.timeit("test_2()", setup=setup, number=10000) | |
option3 = timeit.timeit("test_3()", setup=setup, number=10000) | |
option4 = timeit.timeit("test_4()", setup=setup, number=10000) | |
option5 = timeit.timeit("test_5()", setup=setup, number=10000) | |
print("Original 10k runs: {}\n".format(original) + | |
"Option 1 10k runs: {}\n".format(option1) + | |
"Option 2 10k runs: {}\n".format(option2) + | |
"Option 3 10k runs: {}\n".format(option3) + | |
"Option 4 10k runs: {}\n".format(option4) + | |
"Option 5 10k runs: {}\n".format(option5)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
class QTextCursor(): | |
Down = 'down' | |
Right = 'right' | |
Left = 'left' | |
class FakePaneOne(): | |
def __init__(self): | |
self.print_data = [] | |
self.previous_data = b"" | |
def insertPlainText(self, data): | |
self.print_data.append(data) | |
pass | |
@property | |
def print_output(self): | |
return "".join(self.print_data) | |
def setTextCursor(self, *args, **kwargs): | |
pass | |
def textCursor(self, *args, **kwargs): | |
return self | |
def movePosition(self, *args, **kwargs): | |
return False | |
def removeSelectedText(self, *args, **kwargs): | |
pass | |
def deleteChar(self, *args, **kwargs): | |
pass | |
def ensureCursorVisible(self, *args, **kwargs): | |
pass | |
def position(self, *args, **kwargs): | |
return 0 | |
# Inital test, tries to decode UTF-8 bytes one by one | |
def process_bytes(self, data): | |
""" | |
Given some incoming bytes of data, work out how to handle / display | |
them in the REPL widget. | |
""" | |
tc = self.textCursor() | |
# The text cursor must be on the last line of the document. If it isn't | |
# then move it there. | |
while tc.movePosition(QTextCursor.Down): | |
pass | |
if self.previous_data: | |
data = self.previous_data + data | |
self.previous_data = b"" | |
i = 0 | |
while i < len(data): | |
if data[i] == 8: # \b | |
tc.movePosition(QTextCursor.Left) | |
self.setTextCursor(tc) | |
elif data[i] == 13: # \r | |
pass | |
elif len(data) > i + 1 and data[i] == 27 and data[i + 1] == 91: | |
# VT100 cursor detected: <Esc>[ | |
i += 2 # move index to after the [ | |
regex = r"(?P<count>[\d]*)(;?[\d]*)*(?P<action>[ABCDKm])" | |
m = re.search(regex, data[i:].decode("utf-8")) | |
if m: | |
# move to (almost) after control seq | |
# (will ++ at end of loop) | |
i += m.end() - 1 | |
if m.group("count") == "": | |
count = 1 | |
else: | |
count = int(m.group("count")) | |
if m.group("action") == "A": # up | |
tc.movePosition(QTextCursor.Up, n=count) | |
self.setTextCursor(tc) | |
elif m.group("action") == "B": # down | |
tc.movePosition(QTextCursor.Down, n=count) | |
self.setTextCursor(tc) | |
elif m.group("action") == "C": # right | |
tc.movePosition(QTextCursor.Right, n=count) | |
self.setTextCursor(tc) | |
elif m.group("action") == "D": # left | |
tc.movePosition(QTextCursor.Left, n=count) | |
self.setTextCursor(tc) | |
elif m.group("action") == "K": # delete things | |
if m.group("count") == "": # delete to end of line | |
tc.movePosition( | |
QTextCursor.EndOfLine, | |
mode=QTextCursor.KeepAnchor, | |
) | |
tc.removeSelectedText() | |
self.setTextCursor(tc) | |
elif data[i] == 10: # \n | |
tc.movePosition(QTextCursor.End) | |
self.setTextCursor(tc) | |
self.insertPlainText(chr(data[i])) | |
else: | |
tc.deleteChar() | |
self.setTextCursor(tc) | |
# Received data is UTF-8 encoded, with up to 4 bytes per char | |
utf8_extra_bytes = 0 | |
while utf8_extra_bytes <= 3: | |
if len(data) > i + utf8_extra_bytes: | |
utf8_bytes = bytes(data[i:i + 1 + utf8_extra_bytes]) | |
try: | |
decoded_data = utf8_bytes.decode('utf-8') | |
except UnicodeDecodeError: | |
# Probably don't have all the character bytes yet | |
utf8_extra_bytes += 1 | |
else: | |
self.insertPlainText(decoded_data) | |
i += utf8_extra_bytes | |
break | |
else: | |
self.previous_data = data[i:] | |
i += utf8_extra_bytes | |
break | |
else: | |
# If we are in the last 3 characters of the byte array | |
# then it's possible the last bytes come in the next block | |
#if i <= len(data) - 3: | |
# self.previous_data = data[i:] | |
# As a fallback we can decode this byte using chr(), but | |
# in the meantime, to easily spot issues, throw exception | |
# self.insertPlainText(chr(data[i])) | |
raise Exception("UNEXPECTED UTF-8 DECODE ERROR\n{}\n{}\n{}".format(data, i, data[i])) | |
i += 1 | |
self.ensureCursorVisible() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
class QTextCursor(): | |
Down = 'down' | |
Right = 'right' | |
Left = 'left' | |
class FakePaneTwo(): | |
def __init__(self): | |
self.print_data = [] | |
self.previous_data = b"" | |
def insertPlainText(self, data): | |
self.print_data.append(data) | |
pass | |
@property | |
def print_output(self): | |
return "".join(self.print_data) | |
def setTextCursor(self, *args, **kwargs): | |
pass | |
def textCursor(self, *args, **kwargs): | |
return self | |
def movePosition(self, *args, **kwargs): | |
return False | |
def removeSelectedText(self, *args, **kwargs): | |
pass | |
def deleteChar(self, *args, **kwargs): | |
pass | |
def ensureCursorVisible(self, *args, **kwargs): | |
pass | |
def position(self, *args, **kwargs): | |
return 0 | |
# Manually decode full data byte array first | |
def process_bytes(self, data): | |
""" | |
Given some incoming bytes of data, work out how to handle / display | |
them in the REPL widget. | |
""" | |
tc = self.textCursor() | |
# The text cursor must be on the last line of the document. If it isn't | |
# then move it there. | |
while tc.movePosition(QTextCursor.Down): | |
pass | |
# If previous iterations had incomplete characters add them to the head | |
if self.previous_data: | |
data = self.previous_data + data | |
self.previous_data = b'' | |
# Drop an opening utf-8 sequence without a starting byte | |
# After the 1st byte, the rest of the bytes in the char start with 0b10 | |
drop_bytes = 0 | |
while (data[drop_bytes] >> 6) == 0b10: | |
drop_bytes += 1 | |
data = data[drop_bytes:] | |
# If the last utf-8 character is incomplete, save it for the next iteration | |
# All multi-byte utf-8 characters start with a 0b1 | |
if data[-1] & 0b10000000: | |
save_bytes = 1 | |
while (data[-save_bytes] >> 6) == 0b10: | |
save_bytes += 1 | |
self.previous_data = data[-save_bytes:] | |
data = data[:-save_bytes] | |
try: | |
data = data.decode("utf-8") | |
except UnicodeDecodeError: | |
# TODO: what to do? | |
print('COULD NOT DECODE') | |
i = 0 | |
while i < len(data): | |
if data[i] == "\b": | |
tc.movePosition(QTextCursor.Left) | |
self.setTextCursor(tc) | |
elif data[i] == "\r": | |
pass | |
elif data[i] == "\n": | |
tc.movePosition(QTextCursor.End) | |
self.setTextCursor(tc) | |
self.insertPlainText(data[i]) | |
elif data[i] == "\x1b": # <Esc> | |
if not (len(data) > i + 1): | |
# End of data with an possible incomplete VT100 escape code | |
# so save it for the next time this function is called | |
self.previous_data = data[i:] + self.previous_data | |
else: | |
# Check the next character for a VT100 escape code | |
if data[i + 1] == "[": | |
# VT100 cursor detected: <Esc>[ | |
i += 2 # move index to after the [ | |
regex = r"(?P<count>[\d]*)(;?[\d]*)*(?P<action>[ABCDKm])" | |
m = re.search(regex, data[i:]) | |
if m: | |
# move to (almost) after control seq | |
# (will ++ at end of loop) | |
i += m.end() - 1 | |
if m.group("count") == "": | |
count = 1 | |
else: | |
count = int(m.group("count")) | |
action = m.group("action") | |
if action == "A": # up | |
tc.movePosition(QTextCursor.Up, n=count) | |
self.setTextCursor(tc) | |
elif action == "B": # down | |
tc.movePosition(QTextCursor.Down, n=count) | |
self.setTextCursor(tc) | |
elif action == "C": # right | |
tc.movePosition(QTextCursor.Right, n=count) | |
self.setTextCursor(tc) | |
elif action == "D": # left | |
tc.movePosition(QTextCursor.Left, n=count) | |
self.setTextCursor(tc) | |
elif action == "K": # delete things | |
if m.group("count") == "": # delete to end of line | |
tc.movePosition( | |
QTextCursor.EndOfLine, | |
mode=QTextCursor.KeepAnchor, | |
) | |
tc.removeSelectedText() | |
self.setTextCursor(tc) | |
else: | |
tc.deleteChar() | |
self.setTextCursor(tc) | |
self.insertPlainText(data[i]) | |
i += 1 | |
self.ensureCursorVisible() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import codecs | |
class QTextCursor(): | |
Down = 'down' | |
Right = 'right' | |
Left = 'left' | |
class FakePaneThree(): | |
def __init__(self): | |
self.print_data = [] | |
self.decoder = codecs.getincrementaldecoder("utf8")() | |
def insertPlainText(self, data): | |
self.print_data.append(data) | |
pass | |
@property | |
def print_output(self): | |
return "".join(self.print_data) | |
def setTextCursor(self, *args, **kwargs): | |
pass | |
def textCursor(self, *args, **kwargs): | |
return self | |
def movePosition(self, *args, **kwargs): | |
return False | |
def removeSelectedText(self, *args, **kwargs): | |
pass | |
def deleteChar(self, *args, **kwargs): | |
pass | |
def ensureCursorVisible(self, *args, **kwargs): | |
pass | |
def position(self, *args, **kwargs): | |
return 0 | |
# Uses standard library codecs.incrementaldecoder | |
def process_bytes(self, data): | |
""" | |
Given some incoming bytes of data, work out how to handle / display | |
them in the REPL widget. | |
""" | |
tc = self.textCursor() | |
# The text cursor must be on the last line of the document. If it isn't | |
# then move it there. | |
while tc.movePosition(QTextCursor.Down): | |
pass | |
data = self.decoder.decode(data) | |
i = 0 | |
while i < len(data): | |
if data[i] == "\b": # \b | |
tc.movePosition(QTextCursor.Left) | |
self.setTextCursor(tc) | |
elif data[i] == "\r": # \r | |
pass | |
elif len(data) > i + 1 and data[i] == "\x1b" and data[i + 1] == "[": | |
# VT100 cursor detected: <Esc>[ | |
i += 2 # move index to after the [ | |
regex = r"(?P<count>[\d]*)(;?[\d]*)*(?P<action>[ABCDKm])" | |
m = re.search(regex, data[i:]) | |
if m: | |
# move to (almost) after control seq | |
# (will ++ at end of loop) | |
i += m.end() - 1 | |
if m.group("count") == "": | |
count = 1 | |
else: | |
count = int(m.group("count")) | |
if m.group("action") == "A": # up | |
tc.movePosition(QTextCursor.Up, n=count) | |
self.setTextCursor(tc) | |
elif m.group("action") == "B": # down | |
tc.movePosition(QTextCursor.Down, n=count) | |
self.setTextCursor(tc) | |
elif m.group("action") == "C": # right | |
tc.movePosition(QTextCursor.Right, n=count) | |
self.setTextCursor(tc) | |
elif m.group("action") == "D": # left | |
tc.movePosition(QTextCursor.Left, n=count) | |
self.setTextCursor(tc) | |
elif m.group("action") == "K": # delete things | |
if m.group("count") == "": # delete to end of line | |
tc.movePosition( | |
QTextCursor.EndOfLine, | |
mode=QTextCursor.KeepAnchor, | |
) | |
tc.removeSelectedText() | |
self.setTextCursor(tc) | |
elif data[i] == "\n": # \n | |
tc.movePosition(QTextCursor.End) | |
self.setTextCursor(tc) | |
self.insertPlainText((data[i])) | |
else: | |
tc.deleteChar() | |
self.setTextCursor(tc) | |
self.insertPlainText((data[i])) | |
i += 1 | |
self.ensureCursorVisible() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import codecs | |
class QTextCursor(): | |
Down = 'down' | |
Right = 'right' | |
Left = 'left' | |
class FakePaneFour(): | |
def __init__(self): | |
self.print_data = [] | |
self.unprocessed_input = b"" | |
self.decoder = codecs.getincrementaldecoder("utf8")() | |
def insertPlainText(self, data): | |
self.print_data.append(data) | |
pass | |
@property | |
def print_output(self): | |
return "".join(self.print_data) | |
def setTextCursor(self, *args, **kwargs): | |
pass | |
def textCursor(self, *args, **kwargs): | |
return self | |
def movePosition(self, *args, **kwargs): | |
return False | |
def removeSelectedText(self, *args, **kwargs): | |
pass | |
def deleteChar(self, *args, **kwargs): | |
pass | |
def ensureCursorVisible(self, *args, **kwargs): | |
pass | |
def position(self, *args, **kwargs): | |
return 0 | |
def set_qtcursor_to_devicecursor(self, *args, **kwargs): | |
pass | |
# VT100 and decoding from https://github.com/mu-editor/mu/pull/1026 | |
def process_bytes(self, data): | |
""" | |
Given some incoming bytes of data, work out how to handle / display | |
them in the REPL widget. | |
If received input is incomplete, stores remainder in | |
self.unprocessed_input. | |
Updates the self.device_cursor_position to match that of the device | |
for every input received. | |
""" | |
i = 0 | |
data = self.decoder.decode(data) | |
if len(self.unprocessed_input) > 0: | |
# Prepend bytes from last time, that wasn't processed | |
data = self.unprocessed_input + data | |
self.unprocessed_input = "" | |
# Reset cursor. E.g. if doing a selection, the qt cursor and | |
# device cursor will not match, we reset it here to make sure | |
# they do match (this removes any selections when new input is | |
# received) | |
self.set_qtcursor_to_devicecursor() | |
tc = self.textCursor() | |
while i < len(data): | |
if ord(data[i]) == 8: # \b | |
tc.movePosition(QTextCursor.Left) | |
self.device_cursor_position = tc.position() | |
elif ord(data[i]) == 13: # \r | |
# Carriage return. Do nothing, we handle newlines when | |
# reading \n | |
pass | |
elif ord(data[i]) == 27: | |
# Escape | |
if len(data) > i + 1 and ord(data[i + 1]) == 91: | |
# VT100 cursor detected: <Esc>[ | |
regex = ( | |
r"\x1B\[(?P<count>[\d]*)(;?[\d]*)*(?P<action>[A-Za-z])" | |
) | |
match = re.search(regex, data[i:]) | |
if match: | |
# move to (almost) after control seq | |
# (will ++ at end of loop) | |
i += match.end() - 1 | |
count_string = match.group("count") | |
count = 1 if count_string == "" else int(count_string) | |
action = match.group("action") | |
if action == "A": # up | |
tc.movePosition(QTextCursor.Up, n=count) | |
self.device_cursor_position = tc.position() | |
elif action == "B": # down | |
tc.movePosition(QTextCursor.Down, n=count) | |
self.device_cursor_position = tc.position() | |
elif action == "C": # right | |
tc.movePosition(QTextCursor.Right, n=count) | |
self.device_cursor_position = tc.position() | |
elif action == "D": # left | |
tc.movePosition(QTextCursor.Left, n=count) | |
self.device_cursor_position = tc.position() | |
elif action == "K": # delete things | |
if count_string == "": # delete to end of line | |
tc.movePosition( | |
QTextCursor.EndOfLine, | |
mode=QTextCursor.KeepAnchor, | |
) | |
tc.removeSelectedText() | |
self.device_cursor_position = tc.position() | |
else: | |
# Unknown action, log warning and ignore | |
command = match.group(0).replace("\x1B", "<Esc>") | |
msg = "Received unsupported VT100 command: {}" | |
logger.warning(msg.format(command)) | |
else: | |
# Cursor detected, but no match, must be | |
# incomplete input | |
self.unprocessed_input = data[i:] | |
break | |
elif len(data) == i + 1: | |
# Escape received as end of transmission. Perhaps | |
# the transmission is incomplete, wait until next | |
# bytes are received to determine what to do | |
self.unprocessed_input = data[i:] | |
break | |
elif ord(data[i]) == 10: # \n - newline | |
tc.movePosition(QTextCursor.End) | |
self.device_cursor_position = tc.position() + 1 | |
self.setTextCursor(tc) | |
self.insertPlainText(data[i]) | |
else: | |
# Char received, with VT100 that should be interpreted | |
# as overwrite the char in front of the cursor | |
tc.deleteChar() | |
self.device_cursor_position = tc.position() + 1 | |
self.insertPlainText(data[i]) | |
self.setTextCursor(tc) | |
i += 1 | |
# Scroll textarea if necessary to see cursor | |
self.ensureCursorVisible() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import codecs | |
class QTextCursor(): | |
Down = 'down' | |
Right = 'right' | |
Left = 'left' | |
class FakePaneFive(): | |
def __init__(self): | |
self.print_data = [] | |
self.unprocessed_input = b"" | |
self.decoder = codecs.getincrementaldecoder("utf8")() | |
self.vt100_regex = re.compile( | |
r"\x1B\[(?P<count>[\d]*)(;?[\d]*)*(?P<action>[A-Za-z])" | |
) | |
def insertPlainText(self, data): | |
self.print_data.append(data) | |
pass | |
@property | |
def print_output(self): | |
return "".join(self.print_data) | |
def setTextCursor(self, *args, **kwargs): | |
pass | |
def textCursor(self, *args, **kwargs): | |
return self | |
def movePosition(self, *args, **kwargs): | |
return False | |
def removeSelectedText(self, *args, **kwargs): | |
pass | |
def deleteChar(self, *args, **kwargs): | |
pass | |
def ensureCursorVisible(self, *args, **kwargs): | |
pass | |
def position(self, *args, **kwargs): | |
return 0 | |
def set_qtcursor_to_devicecursor(self, *args, **kwargs): | |
pass | |
# VT100 and decoding from https://github.com/mu-editor/mu/pull/1026 | |
def process_bytes(self, data): | |
""" | |
Given some incoming bytes of data, work out how to handle / display | |
them in the REPL widget. | |
If received input is incomplete, stores remainder in | |
self.unprocessed_input. | |
Updates the self.device_cursor_position to match that of the device | |
for every input received. | |
""" | |
i = 0 | |
data = self.decoder.decode(data) | |
if len(self.unprocessed_input) > 0: | |
# Prepend bytes from last time, that wasn't processed | |
data = self.unprocessed_input + data | |
self.unprocessed_input = "" | |
# Reset cursor. E.g. if doing a selection, the qt cursor and | |
# device cursor will not match, we reset it here to make sure | |
# they do match (this removes any selections when new input is | |
# received) | |
self.set_qtcursor_to_devicecursor() | |
tc = self.textCursor() | |
while i < len(data): | |
if data[i] == "\b": # \b | |
tc.movePosition(QTextCursor.Left) | |
self.device_cursor_position = tc.position() | |
elif data[i] == "\r": # \r | |
# Carriage return. Do nothing, we handle newlines when | |
# reading \n | |
pass | |
elif data[i] == "\x1b": | |
# Escape | |
if len(data) > i + 1 and data[i + 1] == "[": | |
# VT100 cursor detected: <Esc>[ | |
match = self.vt100_regex.search(data[i:]) | |
if match: | |
# move to (almost) after control seq | |
# (will ++ at end of loop) | |
i += match.end() - 1 | |
count_string = match.group("count") | |
count = 1 if count_string == "" else int(count_string) | |
action = match.group("action") | |
if action == "A": # up | |
tc.movePosition(QTextCursor.Up, n=count) | |
self.device_cursor_position = tc.position() | |
elif action == "B": # down | |
tc.movePosition(QTextCursor.Down, n=count) | |
self.device_cursor_position = tc.position() | |
elif action == "C": # right | |
tc.movePosition(QTextCursor.Right, n=count) | |
self.device_cursor_position = tc.position() | |
elif action == "D": # left | |
tc.movePosition(QTextCursor.Left, n=count) | |
self.device_cursor_position = tc.position() | |
elif action == "K": # delete things | |
if count_string == "": # delete to end of line | |
tc.movePosition( | |
QTextCursor.EndOfLine, | |
mode=QTextCursor.KeepAnchor, | |
) | |
tc.removeSelectedText() | |
self.device_cursor_position = tc.position() | |
else: | |
# Unknown action, log warning and ignore | |
command = match.group(0).replace("\x1B", "<Esc>") | |
msg = "Received unsupported VT100 command: {}" | |
logger.warning(msg.format(command)) | |
else: | |
# Cursor detected, but no match, must be | |
# incomplete input | |
self.unprocessed_input = data[i:] | |
break | |
elif len(data) == i + 1: | |
# Escape received as end of transmission. Perhaps | |
# the transmission is incomplete, wait until next | |
# bytes are received to determine what to do | |
self.unprocessed_input = data[i:] | |
break | |
elif data[i] == "\n": # \n - newline | |
tc.movePosition(QTextCursor.End) | |
self.device_cursor_position = tc.position() + 1 | |
self.setTextCursor(tc) | |
self.insertPlainText(data[i]) | |
else: | |
# Char received, with VT100 that should be interpreted | |
# as overwrite the char in front of the cursor | |
tc.deleteChar() | |
self.device_cursor_position = tc.position() + 1 | |
self.insertPlainText(data[i]) | |
self.setTextCursor(tc) | |
i += 1 | |
# Scroll textarea if necessary to see cursor | |
self.ensureCursorVisible() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
class QTextCursor(): | |
Down = 'down' | |
Right = 'right' | |
Left = 'left' | |
class FakePaneOriginal(): | |
def __init__(self): | |
self.print_data = [] | |
self.previous_data = b"" | |
def insertPlainText(self, data): | |
self.print_data.append(data) | |
pass | |
@property | |
def print_output(self): | |
return "".join(self.print_data) | |
def setTextCursor(self, *args, **kwargs): | |
pass | |
def textCursor(self, *args, **kwargs): | |
return self | |
def movePosition(self, *args, **kwargs): | |
return False | |
def removeSelectedText(self, *args, **kwargs): | |
pass | |
def deleteChar(self, *args, **kwargs): | |
pass | |
def ensureCursorVisible(self, *args, **kwargs): | |
pass | |
def position(self, *args, **kwargs): | |
return 0 | |
def process_bytes(self, data): | |
""" | |
Given some incoming bytes of data, work out how to handle / display | |
them in the REPL widget. | |
""" | |
tc = self.textCursor() | |
# The text cursor must be on the last line of the document. If it isn't | |
# then move it there. | |
while tc.movePosition(QTextCursor.Down): | |
pass | |
i = 0 | |
while i < len(data): | |
if data[i] == 8: # \b | |
tc.movePosition(QTextCursor.Left) | |
self.setTextCursor(tc) | |
elif data[i] == 13: # \r | |
pass | |
elif len(data) > i + 1 and data[i] == 27 and data[i + 1] == 91: | |
# VT100 cursor detected: <Esc>[ | |
i += 2 # move index to after the [ | |
regex = r"(?P<count>[\d]*)(;?[\d]*)*(?P<action>[ABCDKm])" | |
m = re.search(regex, data[i:].decode("utf-8")) | |
if m: | |
# move to (almost) after control seq | |
# (will ++ at end of loop) | |
i += m.end() - 1 | |
if m.group("count") == "": | |
count = 1 | |
else: | |
count = int(m.group("count")) | |
if m.group("action") == "A": # up | |
tc.movePosition(QTextCursor.Up, n=count) | |
self.setTextCursor(tc) | |
elif m.group("action") == "B": # down | |
tc.movePosition(QTextCursor.Down, n=count) | |
self.setTextCursor(tc) | |
elif m.group("action") == "C": # right | |
tc.movePosition(QTextCursor.Right, n=count) | |
self.setTextCursor(tc) | |
elif m.group("action") == "D": # left | |
tc.movePosition(QTextCursor.Left, n=count) | |
self.setTextCursor(tc) | |
elif m.group("action") == "K": # delete things | |
if m.group("count") == "": # delete to end of line | |
tc.movePosition( | |
QTextCursor.EndOfLine, | |
mode=QTextCursor.KeepAnchor, | |
) | |
tc.removeSelectedText() | |
self.setTextCursor(tc) | |
elif data[i] == 10: # \n | |
tc.movePosition(QTextCursor.End) | |
self.setTextCursor(tc) | |
self.insertPlainText(chr(data[i])) | |
else: | |
tc.deleteChar() | |
self.setTextCursor(tc) | |
self.insertPlainText(chr(data[i])) | |
i += 1 | |
self.ensureCursorVisible() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Results: