Created
January 22, 2026 15:47
-
-
Save jquast/8f1b0103096604a72f74618f2f7a82b3 to your computer and use it in GitHub Desktop.
Test for GitHub issue #103 of telnetlib3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """Test for GitHub issue #103: 64-character command length bug.""" | |
| import asyncio | |
| import pytest | |
| import telnetlib3 | |
| from telnetlib3.tests.accessories import unused_tcp_port, bind_host # noqa: F401 | |
| from telnetlib3.telopt import IAC, DO, WONT, TTYPE | |
| @pytest.mark.parametrize( | |
| "cmd_length", | |
| [1, 32, 63, 64, 65, 66, 100, 127, 128, 129, 192, 200, 255, 256, 512], | |
| ) | |
| async def test_client_send_various_lengths(bind_host, unused_tcp_port, cmd_length): | |
| """Test that telnetlib3 client correctly sends commands of various lengths.""" | |
| server_received = [] | |
| server_ready = asyncio.Event() | |
| class RecordingProtocol(asyncio.Protocol): | |
| def connection_made(self, transport): | |
| self.transport = transport | |
| server_ready.set() | |
| def data_received(self, data): | |
| server_received.append(bytes(data)) | |
| await asyncio.get_event_loop().create_server( | |
| RecordingProtocol, bind_host, unused_tcp_port | |
| ) | |
| reader, writer = await telnetlib3.open_connection( | |
| host=bind_host, | |
| port=unused_tcp_port, | |
| connect_minwait=0.05, | |
| connect_maxwait=0.05, | |
| ) | |
| await server_ready.wait() | |
| test_cmd = "a" * cmd_length + "\n" | |
| writer.write(test_cmd) | |
| await writer.drain() | |
| await asyncio.sleep(0.1) | |
| received = b"".join(server_received) | |
| expected_payload = test_cmd.encode("utf-8") | |
| assert expected_payload in received | |
| writer.close() | |
| @pytest.mark.parametrize( | |
| "cmd_length", | |
| [1, 32, 63, 64, 65, 66, 100, 127, 128, 129, 192, 200, 255, 256, 512], | |
| ) | |
| async def test_server_receive_various_lengths(bind_host, unused_tcp_port, cmd_length): | |
| """Test that telnetlib3 server correctly receives commands of various lengths.""" | |
| received_data = [] | |
| shell_ready = asyncio.Event() | |
| async def recording_shell(reader, writer): | |
| shell_ready.set() | |
| while True: | |
| try: | |
| data = await asyncio.wait_for(reader.read(1024), timeout=0.5) | |
| if not data: | |
| break | |
| received_data.append(data) | |
| except asyncio.TimeoutError: | |
| break | |
| await telnetlib3.create_server( | |
| host=bind_host, | |
| port=unused_tcp_port, | |
| shell=recording_shell, | |
| connect_maxwait=0.05, | |
| ) | |
| raw_reader, raw_writer = await asyncio.open_connection( | |
| host=bind_host, port=unused_tcp_port | |
| ) | |
| expect_hello = IAC + DO + TTYPE | |
| hello = await asyncio.wait_for(raw_reader.readexactly(len(expect_hello)), 0.5) | |
| assert hello == expect_hello | |
| raw_writer.write(IAC + WONT + TTYPE) | |
| await asyncio.wait_for(shell_ready.wait(), 0.5) | |
| test_payload = ("x" * cmd_length + "\r\n").encode("ascii") | |
| raw_writer.write(test_payload) | |
| await raw_writer.drain() | |
| await asyncio.sleep(0.2) | |
| all_received = "".join(received_data) | |
| expected_cmd = "x" * cmd_length | |
| assert expected_cmd in all_received | |
| raw_writer.close() | |
| @pytest.mark.parametrize( | |
| "cmd_length", | |
| [1, 32, 63, 64, 65, 66, 100, 127, 128, 129, 192, 200, 255, 256, 512], | |
| ) | |
| async def test_full_roundtrip_echo(bind_host, unused_tcp_port, cmd_length): | |
| """Test full roundtrip with echo: send command, receive echo, receive response.""" | |
| response_marker = "RESPONSE_OK" | |
| async def echo_shell(reader, writer): | |
| while True: | |
| try: | |
| line = await asyncio.wait_for(reader.readline(), timeout=0.5) | |
| if not line: | |
| break | |
| writer.echo(line) | |
| writer.write(f"\n{response_marker}\n") | |
| await writer.drain() | |
| except asyncio.TimeoutError: | |
| break | |
| await telnetlib3.create_server( | |
| host=bind_host, | |
| port=unused_tcp_port, | |
| shell=echo_shell, | |
| connect_maxwait=0.05, | |
| ) | |
| reader, writer = await telnetlib3.open_connection( | |
| host=bind_host, | |
| port=unused_tcp_port, | |
| connect_minwait=0.05, | |
| connect_maxwait=0.05, | |
| ) | |
| test_cmd = "e" * cmd_length | |
| writer.write(test_cmd + "\r\n") | |
| await writer.drain() | |
| collected = "" | |
| try: | |
| while response_marker not in collected: | |
| chunk = await asyncio.wait_for(reader.read(1024), timeout=0.5) | |
| if not chunk: | |
| break | |
| collected += chunk | |
| except asyncio.TimeoutError: | |
| pass | |
| assert response_marker in collected | |
| writer.close() | |
| @pytest.mark.parametrize( | |
| "cmd_length", | |
| [63, 64, 65], | |
| ) | |
| async def test_raw_echo_boundary_64(bind_host, unused_tcp_port, cmd_length): | |
| """Test raw byte echo at 64-character boundary without telnet negotiation.""" | |
| echo_received = bytearray() | |
| echo_complete = asyncio.Event() | |
| class EchoProtocol(asyncio.Protocol): | |
| def connection_made(self, transport): | |
| self.transport = transport | |
| def data_received(self, data): | |
| echo_received.extend(data) | |
| self.transport.write(data) | |
| if b"\n" in data: | |
| asyncio.get_event_loop().call_soon(echo_complete.set) | |
| await asyncio.get_event_loop().create_server( | |
| EchoProtocol, bind_host, unused_tcp_port | |
| ) | |
| reader, writer = await telnetlib3.open_connection( | |
| host=bind_host, | |
| port=unused_tcp_port, | |
| connect_minwait=0.05, | |
| connect_maxwait=0.05, | |
| ) | |
| test_payload = "Z" * cmd_length + "\n" | |
| writer.write(test_payload) | |
| await writer.drain() | |
| await asyncio.wait_for(echo_complete.wait(), 0.5) | |
| expected = test_payload.encode("utf-8") | |
| assert expected == bytes(echo_received) | |
| client_received = "" | |
| try: | |
| while len(client_received) < len(test_payload): | |
| chunk = await asyncio.wait_for(reader.read(100), timeout=0.3) | |
| if not chunk: | |
| break | |
| client_received += chunk | |
| except asyncio.TimeoutError: | |
| pass | |
| assert client_received == test_payload | |
| writer.close() | |
| async def test_64_char_exact_boundary_detailed(bind_host, unused_tcp_port): | |
| """Detailed test specifically for 64-character exact boundary issue.""" | |
| server_log = [] | |
| client_log = [] | |
| async def logging_shell(reader, writer): | |
| while True: | |
| try: | |
| data = await asyncio.wait_for(reader.read(1), timeout=0.5) | |
| if not data: | |
| break | |
| server_log.append(("recv", repr(data))) | |
| writer.write(data) | |
| server_log.append(("echo", repr(data))) | |
| await writer.drain() | |
| except asyncio.TimeoutError: | |
| break | |
| await telnetlib3.create_server( | |
| host=bind_host, | |
| port=unused_tcp_port, | |
| shell=logging_shell, | |
| connect_maxwait=0.05, | |
| ) | |
| reader, writer = await telnetlib3.open_connection( | |
| host=bind_host, | |
| port=unused_tcp_port, | |
| connect_minwait=0.05, | |
| connect_maxwait=0.05, | |
| ) | |
| test_64 = "A" * 64 | |
| writer.write(test_64) | |
| await writer.drain() | |
| client_log.append(("sent", repr(test_64))) | |
| await asyncio.sleep(0.2) | |
| echoed = "" | |
| try: | |
| while len(echoed) < 64: | |
| chunk = await asyncio.wait_for(reader.read(100), timeout=0.2) | |
| if not chunk: | |
| break | |
| echoed += chunk | |
| client_log.append(("recv", repr(chunk))) | |
| except asyncio.TimeoutError: | |
| pass | |
| assert len(echoed) == 64 | |
| assert echoed == test_64 | |
| escape_seq = "\x1b[A" | |
| assert escape_seq not in echoed | |
| writer.close() | |
| async def test_command_boundary_with_newline(bind_host, unused_tcp_port): | |
| """Test command ending with newline at various boundaries.""" | |
| results = {"lines": []} | |
| shell_ready = asyncio.Event() | |
| async def capture_shell(reader, writer): | |
| shell_ready.set() | |
| while True: | |
| try: | |
| line = await asyncio.wait_for(reader.readline(), timeout=1.0) | |
| if not line: | |
| break | |
| results["lines"].append(line) | |
| writer.write("OK\r\n") | |
| await writer.drain() | |
| except asyncio.TimeoutError: | |
| break | |
| await telnetlib3.create_server( | |
| host=bind_host, | |
| port=unused_tcp_port, | |
| shell=capture_shell, | |
| connect_maxwait=0.05, | |
| ) | |
| reader, writer = await telnetlib3.open_connection( | |
| host=bind_host, | |
| port=unused_tcp_port, | |
| connect_minwait=0.05, | |
| connect_maxwait=0.10, | |
| ) | |
| await asyncio.wait_for(shell_ready.wait(), 1.0) | |
| await asyncio.sleep(0.05) | |
| for length in [63, 64, 65]: | |
| cmd = "c" * length + "\r\n" | |
| writer.write(cmd) | |
| await writer.drain() | |
| try: | |
| response = await asyncio.wait_for(reader.readline(), timeout=1.0) | |
| assert "OK" in response | |
| except asyncio.TimeoutError: | |
| pytest.fail(f"No response for {length}-char command") | |
| await asyncio.sleep(0.1) | |
| assert len(results.get("lines", [])) == 3 | |
| for i, expected_len in enumerate([63, 64, 65]): | |
| line = results["lines"][i] | |
| line_content = line.rstrip("\r\n") | |
| assert len(line_content) == expected_len | |
| writer.close() | |
| @pytest.mark.parametrize("encoding", ["utf-8", "ascii", "latin-1"]) | |
| async def test_64_char_various_encodings(bind_host, unused_tcp_port, encoding): | |
| """Test 64-character commands with various encodings.""" | |
| received = [] | |
| async def capture_shell(reader, writer): | |
| while True: | |
| try: | |
| data = await asyncio.wait_for(reader.read(100), timeout=0.3) | |
| if not data: | |
| break | |
| received.append(data) | |
| except asyncio.TimeoutError: | |
| break | |
| await telnetlib3.create_server( | |
| host=bind_host, | |
| port=unused_tcp_port, | |
| shell=capture_shell, | |
| connect_maxwait=0.05, | |
| ) | |
| reader, writer = await telnetlib3.open_connection( | |
| host=bind_host, | |
| port=unused_tcp_port, | |
| encoding=encoding, | |
| connect_minwait=0.05, | |
| connect_maxwait=0.05, | |
| ) | |
| test_64 = "B" * 64 | |
| writer.write(test_64) | |
| await writer.drain() | |
| await asyncio.sleep(0.2) | |
| all_received = "".join(received) | |
| assert test_64 in all_received | |
| writer.close() | |
| async def test_sequential_64_char_commands(bind_host, unused_tcp_port): | |
| """Test sending multiple 64-character commands in sequence.""" | |
| responses = [] | |
| async def numbered_shell(reader, writer): | |
| count = 0 | |
| while True: | |
| try: | |
| line = await asyncio.wait_for(reader.readline(), timeout=0.3) | |
| if not line: | |
| break | |
| count += 1 | |
| response = f"Response {count}\r\n" | |
| writer.write(response) | |
| await writer.drain() | |
| except asyncio.TimeoutError: | |
| break | |
| await telnetlib3.create_server( | |
| host=bind_host, | |
| port=unused_tcp_port, | |
| shell=numbered_shell, | |
| connect_maxwait=0.05, | |
| ) | |
| reader, writer = await telnetlib3.open_connection( | |
| host=bind_host, | |
| port=unused_tcp_port, | |
| connect_minwait=0.05, | |
| connect_maxwait=0.05, | |
| ) | |
| for i in range(5): | |
| cmd = "D" * 64 + "\r\n" | |
| writer.write(cmd) | |
| await writer.drain() | |
| try: | |
| response = await asyncio.wait_for(reader.readline(), timeout=0.3) | |
| responses.append(response) | |
| except asyncio.TimeoutError: | |
| pytest.fail(f"No response for command {i + 1}") | |
| assert len(responses) == 5 | |
| for i, resp in enumerate(responses): | |
| assert f"Response {i + 1}" in resp | |
| writer.close() | |
| async def test_interleaved_command_lengths(bind_host, unused_tcp_port): | |
| """Test interleaved commands of different lengths including 64.""" | |
| results = {"lengths": []} | |
| shell_ready = asyncio.Event() | |
| async def length_echo_shell(reader, writer): | |
| shell_ready.set() | |
| while True: | |
| try: | |
| line = await asyncio.wait_for(reader.readline(), timeout=1.0) | |
| if not line: | |
| break | |
| content = line.rstrip("\r\n") | |
| results["lengths"].append(len(content)) | |
| writer.write(f"len={len(content)}\r\n") | |
| await writer.drain() | |
| except asyncio.TimeoutError: | |
| break | |
| await telnetlib3.create_server( | |
| host=bind_host, | |
| port=unused_tcp_port, | |
| shell=length_echo_shell, | |
| connect_maxwait=0.05, | |
| ) | |
| reader, writer = await telnetlib3.open_connection( | |
| host=bind_host, | |
| port=unused_tcp_port, | |
| connect_minwait=0.05, | |
| connect_maxwait=0.10, | |
| ) | |
| await asyncio.wait_for(shell_ready.wait(), 1.0) | |
| await asyncio.sleep(0.05) | |
| test_lengths = [32, 64, 128, 64, 63, 65, 64, 100] | |
| for length in test_lengths: | |
| cmd = "X" * length + "\r\n" | |
| writer.write(cmd) | |
| await writer.drain() | |
| try: | |
| response = await asyncio.wait_for(reader.readline(), timeout=1.0) | |
| assert f"len={length}" in response | |
| except asyncio.TimeoutError: | |
| pytest.fail(f"No response for {length}-char command") | |
| await asyncio.sleep(0.1) | |
| assert results.get("lengths") == test_lengths | |
| writer.close() | |
| async def test_issue_103_echo_with_exact_64_chars(bind_host, unused_tcp_port): | |
| """Test issue #103: echo "a" * 64 pattern with terminal echo enabled. | |
| This test simulates the user's reported issue where a 64-character command | |
| results in unexpected escape sequences being returned. | |
| """ | |
| shell_ready = asyncio.Event() | |
| client_received = [] | |
| async def echo_shell(reader, writer): | |
| writer.write("prompt> ") | |
| await writer.drain() | |
| shell_ready.set() | |
| while True: | |
| try: | |
| char = await asyncio.wait_for(reader.read(1), timeout=1.0) | |
| if not char: | |
| break | |
| writer.echo(char) | |
| await writer.drain() | |
| if char in ("\r", "\n"): | |
| writer.write("\r\ncommand received\r\nprompt> ") | |
| await writer.drain() | |
| except asyncio.TimeoutError: | |
| break | |
| await telnetlib3.create_server( | |
| host=bind_host, | |
| port=unused_tcp_port, | |
| shell=echo_shell, | |
| connect_maxwait=0.05, | |
| ) | |
| reader, writer = await telnetlib3.open_connection( | |
| host=bind_host, | |
| port=unused_tcp_port, | |
| connect_minwait=0.05, | |
| connect_maxwait=0.10, | |
| ) | |
| await asyncio.wait_for(shell_ready.wait(), 1.0) | |
| initial = await asyncio.wait_for(reader.read(100), timeout=0.5) | |
| assert "prompt>" in initial | |
| cmd_64 = "echo " + "a" * 59 | |
| assert len(cmd_64) == 64 | |
| writer.write(cmd_64 + "\n") | |
| await writer.drain() | |
| response = "" | |
| try: | |
| while "command received" not in response: | |
| chunk = await asyncio.wait_for(reader.read(256), timeout=0.5) | |
| if not chunk: | |
| break | |
| response += chunk | |
| client_received.append(chunk) | |
| except asyncio.TimeoutError: | |
| pass | |
| cursor_up = "\x1b[A" | |
| assert cursor_up not in response | |
| assert "command received" in response | |
| writer.close() | |
| async def test_issue_103_ljust_pattern(bind_host, unused_tcp_port): | |
| """Test the exact pattern from issue #103: 'echo '.ljust(64, 'a'). | |
| The user reported that 'echo '.ljust(64, 'a') causes issues. | |
| """ | |
| shell_ready = asyncio.Event() | |
| async def simple_shell(reader, writer): | |
| shell_ready.set() | |
| while True: | |
| try: | |
| line = await asyncio.wait_for(reader.readline(), timeout=1.0) | |
| if not line: | |
| break | |
| writer.write("OK\r\n") | |
| await writer.drain() | |
| except asyncio.TimeoutError: | |
| break | |
| await telnetlib3.create_server( | |
| host=bind_host, | |
| port=unused_tcp_port, | |
| shell=simple_shell, | |
| connect_maxwait=0.05, | |
| ) | |
| reader, writer = await telnetlib3.open_connection( | |
| host=bind_host, | |
| port=unused_tcp_port, | |
| connect_minwait=0.05, | |
| connect_maxwait=0.10, | |
| ) | |
| await asyncio.wait_for(shell_ready.wait(), 1.0) | |
| await asyncio.sleep(0.05) | |
| non_working_cmd = "echo ".ljust(64, "a") | |
| assert len(non_working_cmd) == 64 | |
| writer.write(non_working_cmd + "\n") | |
| await writer.drain() | |
| try: | |
| response = await asyncio.wait_for(reader.readline(), timeout=1.0) | |
| assert "OK" in response | |
| except asyncio.TimeoutError: | |
| pytest.fail("No response for 64-char ljust command") | |
| working_cmd = " " + "echo ".ljust(64, "a") | |
| assert len(working_cmd) == 65 | |
| writer.write(working_cmd + "\n") | |
| await writer.drain() | |
| try: | |
| response = await asyncio.wait_for(reader.readline(), timeout=1.0) | |
| assert "OK" in response | |
| except asyncio.TimeoutError: | |
| pytest.fail("No response for 65-char command") | |
| writer.close() | |
| async def test_bytes_integrity_at_64_boundary(bind_host, unused_tcp_port): | |
| """Verify exact byte integrity around 64-character boundary. | |
| Check that no bytes are lost, duplicated, or corrupted at the boundary. | |
| """ | |
| received_bytes = bytearray() | |
| class ByteRecorderProtocol(asyncio.Protocol): | |
| def connection_made(self, transport): | |
| self.transport = transport | |
| def data_received(self, data): | |
| received_bytes.extend(data) | |
| self.transport.write(data) | |
| await asyncio.get_event_loop().create_server( | |
| ByteRecorderProtocol, bind_host, unused_tcp_port | |
| ) | |
| reader, writer = await telnetlib3.open_connection( | |
| host=bind_host, | |
| port=unused_tcp_port, | |
| connect_minwait=0.05, | |
| connect_maxwait=0.05, | |
| ) | |
| for boundary in [62, 63, 64, 65, 66]: | |
| received_bytes.clear() | |
| test_data = "".join(chr((i % 94) + 33) for i in range(boundary)) | |
| writer.write(test_data) | |
| await writer.drain() | |
| await asyncio.sleep(0.1) | |
| assert test_data.encode("utf-8") in received_bytes | |
| writer.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment