websocket长连接压⼒测试踩过的坑
Websocket协议压测记录
背景:
公司的⾏情系统是采⽤的websocket协议,有请求和订阅两种⽅式向服务器申请最新⾏情信息。请求⽅式是⼀次的,订阅⽅式是建⽴连接后,服务器定时向客户端推送⾏情信息。
初步测试⽅案:
因考虑到websocket是双⼯通讯,是长连接,并且本次压测的性能指标是系统能建⽴的最⼤连接数,并且是建⽴连接后服务器能持续向客户端推送⾏情信息。
基于以上原因考虑⽤python采⽤多线程建⽴连接,为了验证能否收到推送的信息,把返回的⾏情信息保存到⽂本⽂件中。Python脚本如下:
import websocket
import time
import threading
import gzip
#import json
#from threadpool import ThreadPool, makeRequests
#from websocket import create_connection
SERVER_URL = "ws://shell-dev/r1/main/ws"
#SERVER_URL = "wss://i.cg/wi/ws"
#SERVER_URL = "wss://shell/r1/main/ws"
def on_message(ws, message):
print(message)
def on_error(ws, error):
print(error)
def on_close(ws):
print("### closed ###")
def on_open(ws):
def send_trhead():
send_info = '{"sub": "husdt.kline.1min","id": "id10"}'
#send_info = '{"event":"subscribe", "channel":"btc_usdt.deep"}'        while True:
#time.sleep(5)
#ws.send(json.dumps(send_info))
ws.send(send_info)
while (1):
compressData = ws.recv()
result = gzip.decompress(compressData).decode('utf-8')                if result[:7] == '{"ping"':
ts = result[8:21]
pong = '{"pong":' + ts + '}'
ws.send(pong)
ws.send(send_info)
else:
#print(result)
with open('./', 'a') as f:
f.write(threadin
g.currentThread().name+'\n')
f.write(result+'\n')
t = threading.Thread(target=send_trhead)
t.start()
print(threading.currentThread().name)
def on_start(a):
# time.sleep(2)
# ableTrace(True)
# ws = websocket.WebSocketApp(SERVER_URL,
#                            on_message=on_message,
#                            on_error=on_error,
#                            on_close=on_close)
# ws.on_open = on_open
# ws.run_forever()
#print(a[2])
try:
ws = ate_connection(SERVER_URL)
on_open(ws)
except Exception as e:
print('error is :',e)
print('connect ws ')
time.sleep(5)
if __name__ == "__main__":
# pool = ThreadPool(3)
# test = list()
# for ir in range(3):
#    test.append(ir)
#
# requests = makeRequests(on_start, test)
# [pool.putRequest(req) for req in requests]
# pool.wait()
# # #on_start(1)
for ir in range(20):
on_start(1)
time.sleep(0.1)
初步测试结果:
在压测的过程中,发现连接数达到⼀定程度(单机1400连接),连接就断掉了,监控发现压⼒机内存基本消耗光了,因建⽴连接,并接收返回的信息,随着连接数增加,内存消耗⼤,只能断开连接,释放内存。
调整测试⽅案:
和架构、开发讨论后,准备在websocket客户端采⽤AIO异步通讯⽅式增⼤压⼒,因当时是考虑到长连接未考虑这种⽅式,查询资料,发现websocket服务端可以采⽤AIO异步通讯⽅式,在websocket客户端尝试⼀下,采⽤locust + python的⽅式,也查了⼀些资料,发现⽅案可⾏。
Locust是⼀款可扩展的,分布式的,性能测试的,开源的,⽤Python编写的性能测试⼯具。对于测试HTTP协议的接⼝是⽐较⽅便的,但是它也⽀持测试别的协议的接⼝,不过需要重写Locust类。脚本如下:
from locust import Locust, events, task, TaskSet
import websocket
import time
import gzip
class WebSocketClient():
def __init__(self, host, port):
self.host = host
self.port = port
class WebSocketLocust(Locust):
def __init__(self, *args, **kwargs):
self.client = WebSocketClient("172.31.15.85", 9503)
class UserBehavior(TaskSet):
ws = websocket.WebSocket()
#t("ws://10.98.64.103:8807")
@task(1)
def buy(self):
try:
start_time = time.time()
#self.ws.send('{"url":"/buy","data":{"id":"123","issue":"20170822","doubled_num":2}}')            #result = v()
send_info = '{"sub": "husdt.kline.1min","id": "id10"}'
# send_info = '{"event":"subscribe", "channel":"btc_usdt.deep"}'
while True:
# time.sleep(5)
# ws.send(json.dumps(send_info))
ws.send(send_info)
while (1):
compressData = ws.recv()
result = gzip.decompress(compressData).decode('utf-8')
if result[:7] == '{"ping"':
ts = result[8:21]
pong = '{"pong":' + ts + '}'
ws.send(pong)
ws.send(send_info)
else:
# print(result)
with open('./', 'a') as f:
#f.write(threading.currentThread().name + '\n')
f.write(result + '\n')
except Exception as e:
print("error is:",e)
class ApiUser(WebSocketLocust):
task_set = UserBehavior
min_wait = 100
max_wait = 200
websocket和socket⽤命令执⾏脚本:
Locust -f websocket_client_locust.py –-no-web -c 1 -r 1 -t 60s
单个⽤户执⾏成功,并能⽣成⽂件。但多个⽤户执⾏的时候就报错,报错信息如下:This socket is already used by another greenlet: <bound method Waiter.switch of <gevent.hub.Waiter object at 0x7f01f0594900>>
错误原因说的是socke正在被使⽤,但是我的代码中是新的socket,简单分析了⼀下,应该不会出现问题,但是我的socek的使⽤部分是⼀个全局的client,然后程序运⾏的时候出现了上述错误.仔细推测我出了原因:
geven是个协程库,那么多个协程共⽤⼀个socek的时候就会出现上述错误了,于是我把socket改成了局部的,问题解决.
修改前:

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。