Created
March 6, 2018 11:02
-
-
Save litnimax/888d67e82917bb253bcd2b5cddfc54e8 to your computer and use it in GitHub Desktop.
MQTT RPC test
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import logging | |
from mqttrpc import MQTTRPCServer, dispatcher | |
logging.basicConfig(level=logging.INFO) | |
class TestMQTTRPCServer(MQTTRPCServer): | |
@dispatcher.public | |
def test(a): | |
return a | |
async def run_test(self, i): | |
await self.client._connected_state.wait() | |
try: | |
proxy = self.get_proxy_for('test') | |
return await proxy.test(i) | |
except Exception: | |
return None | |
loop = asyncio.get_event_loop() | |
server = TestMQTTRPCServer('mqtt://test:test@localhost', loop=loop) | |
loop.create_task(server.serve_forever()) | |
started = loop.time() | |
tasks = [server.run_test(i) for i in range(0,100)] | |
results, _ = loop.run_until_complete(asyncio.wait(tasks)) | |
print ('Count: ', len([k.result() for k in results if k.result()])) | |
print ('Time spent: ', int(loop.time() - started)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment