Last active
April 6, 2020 06:46
-
-
Save nobbynobbs/31c36d8fca5da7b2172d1da1d856bffe to your computer and use it in GitHub Desktop.
py27, twisted, autobahn
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from __future__ import unicode_literals | |
from __future__ import print_function | |
from __future__ import absolute_import | |
import base64 | |
import pytest | |
import pytest_twisted | |
from autobahn.twisted.websocket import create_client_agent | |
from twisted.internet import reactor | |
from twisted.internet.protocol import Factory | |
from twisted.internet import task | |
from twisted.protocols.basic import LineReceiver | |
class HelloServerProtocol(LineReceiver): | |
def __init__(self): | |
self.setRawMode() | |
def connectionMade(self): | |
print( | |
"connectionMade", self.transport.getHost(), | |
self.transport.getPeer()) | |
def lineReceived(self, data): | |
print("lineReceived: {}".format(data.decode("utf-8"))) | |
self.sendLine(data) | |
def rawDataReceived(self, data): | |
print("rawDataReceived: {}".format(data.decode("utf-8"))) | |
self.sendLine(data) | |
@pytest.fixture(scope="session") | |
def get_port(): | |
def _find_free_port(): | |
import socket | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
s.bind(("0.0.0.0", 0)) | |
portnum = s.getsockname()[1] | |
s.close() | |
return portnum | |
return _find_free_port | |
@pytest.fixture() | |
def port(get_port): | |
from autobahn.twisted.websocket import WrappingWebSocketServerFactory | |
wrappedFactory = Factory.forProtocol(HelloServerProtocol) | |
free_port = get_port() | |
factory = WrappingWebSocketServerFactory( | |
wrappedFactory, | |
"ws://127.0.0.1:{}".format(free_port), | |
enableCompression=False, | |
) | |
port = reactor.listenTCP(free_port, factory) | |
yield port | |
port.stopListening() | |
@pytest.fixture() | |
def client(port): | |
agent = create_client_agent(reactor) | |
options = { | |
"protocols": ["base64"] | |
} | |
url = "ws://localhost:{}/".format(port.getHost().port) | |
client = pytest_twisted.blockon(agent.open(url, options=options)) | |
pytest_twisted.blockon(client.is_open) | |
return client | |
@pytest_twisted.inlineCallbacks | |
def test_echo(client): | |
received = {} | |
def got_message(msg, is_binary): | |
print("on_message: args={} kwargs={}".format(msg, is_binary)) | |
received[0] = msg | |
# set handler, send message | |
client.on("message", got_message) | |
sent_msg = b"i am a message" | |
client.sendMessage(base64.b64encode(sent_msg)) | |
# boilerplate, want to move it into the fixture teardown section, but can't | |
yield task.deferLater(reactor, 0, lambda: None) | |
client.sendClose(code=1000, reason="byebye") | |
yield client.is_closed | |
# assertion | |
assert sent_msg == base64.b64decode(received[0]).strip() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment