Skip to content

Instantly share code, notes, and snippets.

@ktosiu
Forked from jander/echo.py
Last active August 29, 2015 14:08
Show Gist options
  • Save ktosiu/35e34edac86889af1234 to your computer and use it in GitHub Desktop.
Save ktosiu/35e34edac86889af1234 to your computer and use it in GitHub Desktop.
####################################
# echo.py
####################################
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import unicode_literals, division
import gevent
from gevent.pool import Pool
from gevent.server import StreamServer
def echo(socket, address):
print('New connection from %s:%s' % address)
while 1:
data = socket.recv(1024)
if not data:
print('client disconnected')
break
if data.strip() == 'quit':
print('client quit')
break
socket.sendall(data)
socket.close()
def main(port, handle):
pool = Pool(10000) # do not accept more than 10000 connections
server = StreamServer(('0.0.0.0', port), handle, spawn=pool, backlog=2000)
print('Starting tcp server on port %d' % port)
try:
server.serve_forever()
finally:
server.close()
if __name__ == '__main__':
main(5000, echo)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import unicode_literals, division
import gevent
import gevent.socket
from gevent.pool import Pool
from datetime import datetime
import sys
import StringIO
import traceback
MESSAGE = 'hello world'
def visit_echo_server(server_address):
'''访问服务器'''
global MESSAGE
try:
client = gevent.socket.create_connection(server_address)
start = datetime.now()
client.sendall(MESSAGE)
gevent.sleep(0)
data = client.recv(1024)
assert MESSAGE == data
return (datetime.now() - start).microseconds
except:
catch_error()
return 0
def test_echo_server(request_count, concurrency_count, server_address):
'''测试服务器
-request_count 请求次数
-concurrency_count 并发数量
-server_address 服务器地址
'''
adderesses = [server_address for i in range(request_count)]
pool = Pool(concurrency_count)
results = pool.map(visit_echo_server, adderesses)
report(results, request_count, concurrency_count)
ERRORS = []
def catch_error():
'''把错误加入ERRORS全局数组'''
global ERRORS
#捕获太多的错误没意义
if len(ERRORS) < 5:
fp = StringIO.StringIO()
traceback.print_exc(file=fp)
ERRORS.append(fp.getvalue())
fp.close()
def report_slice(datas, start, end):
'''报告一组数据'''
mins = min(datas)
maxs = max(datas)
means = sum(datas) // len(datas)
errors = [i for i in datas if i==0]
return (['request[%d:%d]' % (start, end), repr(mins), repr(means), repr(maxs), repr(len(errors))])
def print_table(table):
'''格式化打印
table 是一个二维字符串数组,每行的长度相同
'''
colens = [0] * len(table[0])
for row in table:
for i, value in enumerate(row):
if len(value) > colens[i]:
colens[i] = len(value)
lines = []
for row in table:
line = []
for i, value in enumerate(row):
line.append(value+ ' '*(colens[i]-len(value)+4))
lines.append(''.join(line))
print('\n'.join(lines))
def report(results, request_count, concurrency_count):
'''打印测试报告'''
# report header
print('request: %d\nconcurrency: %d' % (request_count, concurrency_count))
table=[]
table.append(['', 'min', 'mean', 'max', 'errors'])
slice_base = 1000
if request_count >= 100000:
slice_base = 10000
# report slice
slice_count, mod_num = divmod(request_count, slice_base)
for i in range(slice_count):
start, end = i*slice_base, (i+1)*slice_base-1
table.append(report_slice(results[start: end], start, end))
else:
if mod_num > 0:
start, end = i*slice_base, i*slice_base+mod_num
table.append(report_slice(results[start: end], start, end))
# report all
table.append(report_slice(results, 0, -1))
print_table(table)
# report error
if len(ERRORS) > 0:
print('Errors:')
print('\n------------------------------\n'.join(ERRORS))
if __name__ == '__main__':
print('Start testing...')
test_echo_server(request_count = 1000000,
concurrency_count = 5000,
server_address = ('localhost', 5000))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment