python脚本测试——接⼝⾼并发场景⼀、环境准备。
1、下载安装python
2,安装教程请⾃⾏百度
3、IDEA在file–settings–Plugins中安装Python
⼆、复⽤需要修改的地⽅:
1、url路径
2、token信息
3、连接数据库的信息
4、sql语句
5、请求⽅式:get、post、delete、put
6、⽤户username的特殊性规则
7、并发线程数:concurrent_thread_count
8、请求数据在RequestBody中还是在RequestParam中
三、添加学⽣。
1、设计思路
设置线程数,作为循环次数,特殊值+i作为学号,性别取1、2的随机数
2、实现代码如下
# 并发测试框架//添加学⽣
import pymysql
import requests,threading
import random
# 接⼝路径和token
data ={
"url":"。。。/base/add",
"header":{
"X-Access-Token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9."
},
}
# 并发线程数
concurrent_thread_count =1000
# 每个线程中循环测试接⼝的次数
ONE_WORKER_NUM =1
# 测试代码
def test(username):
random.randint(1,2)
# 获取试题接⼝
# 参数在请求体中时,使⽤bodys
bodys ={
# 请求体参数
"num": username,
"name":"随删数据",
"gender": random.randint(1,2)
}
print(bodys)
response = requests.post(data["url"], json=bodys, headers=data["header"])
# response = (data["url"]+'?userId='+resultSql[0], headers=data["header"])
if response.status_code ==200:
result = t.decode('utf-8')
else:
result ="访问失败"
print(threading.current_thread(), result)
# 单个线程,可以设置⼀个线程访问⼏次接⼝
def working(username):
global ONE_WORKER_NUM
for i in range(0, ONE_WORKER_NUM):
test(username)
def t():
Threads =[]
# 根据数据的数量当作循环次数,每次创建⼀个线程
for i in range(concurrent_thread_count):
username =111111000+i
t = threading.Thread(target=working, args=(username,))
t.setDaemon(True)
Threads.append(t)
for t in Threads:
t.start()
for t in Threads:
t.join()
if __name__=="__main__":
t()
四、获取试题。
1、设计思路如下
1、第⼀步:先执⾏查询数据库操作,查询出来⼀共有多少条符合条件的数据;数据总数作为循环的次数,每次循环起⼀个线程,线程中带有参数username,根据添加⽤户时账号的规律设置username = “”"’%s’"""%(111111000+i)当作参数传递
2、第⼆步:每个线程中可以设置循环次数,每次调⽤具体⽅法
3、第三步:测试代码部分,根据传递的username作为条件查询⽤户id;获取到⽤户id后当作参数去访问接⼝,打印返回值2、实现代码如下(使⽤多个不同的⽤户id做参数)
# 并发测试框架
import pymysql
import requests,threading
# 接⼝路径和token
data ={
"url":"。。。/mobile/getAll",
"header":{
"X-Access-Token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9"
},
}
# 每个线程中循环测试接⼝的次数
ONE_WORKER_NUM =1
# 测试代码
def test(username):
# 根据username查询id
conn = t(
host ="",
port =3306,
user ="root",
passwd ="...",
db="jeecg"
)
cur = conn.cursor()
sql ="SELECT id FROM user WHERE username ="+username
# print(sql)
resultSql = cur.fetchone()
# 获取试题接⼝
# 参数在请求体中时,使⽤bodys
# bodys = {
# # 请求体参数
# "userId": resultSql[0]
# }
# response = (data["url"]+'?userId='+resultSql[0], json=bodys, headers=data["header"])
print(data["url"]+'?userId='+resultSql[0])
response = (data["url"]+'?userId='+resultSql[0], headers=data["header"])
if response.status_code ==200:
result = t.decode('utf-8')
else:
result ="访问失败"
print(threading.current_thread(), result)
# 单个线程,可以设置⼀个线程访问⼏次接⼝
def working(username):
global ONE_WORKER_NUM
for i in range(0, ONE_WORKER_NUM):
test(username)
def t():
# 查询符合条件的数据总数
conn = t(
host ="",
port =3306,
user ="root",
passwd ="...",
db="jeecg"
)
cur = conn.cursor()
sql ="SELECT COUNT(*) FROM user WHERE username LIKE '11111%' "
sql ="SELECT COUNT(*) FROM user WHERE username LIKE '11111%' "
# 打印SQL
# print(sql)
result = cur.fetchone()
# 打印循环次数
print(result[0])
Threads =[]
# 根据数据的数量当作循环次数,每次创建⼀个线程
for i in range(result[0]):
username ="""'%s'"""%(111111000+i)
t = threading.Thread(target=working, args=(username,))
# 使⽤setDaemon()和守护线程这⽅⾯知识有关,⽐如在启动线程前设置thread.setDaemon(True),就是设置该线程为守护线程,# 表⽰该线程是不重要的,进程退出时不需要等待这个线程执⾏完成。
# 这样做的意义在于:避免⼦线程⽆限死循环,导致退不出程序,也就是避免楼上说的孤⼉进程。
#
# thread.setDaemon()设置为True, 则设为true的话则主线程执⾏完毕后会将⼦线程回收掉,
# 设置为false,主进程执⾏结束时不会回收⼦线程
# setDaemon()说明:
# setDaemon():设置此线程是否被主线程守护回收。默认False不回收,需要在 start ⽅法前调⽤;
# 设为True相当于像主线程中注册守护,主线程结束时会将其⼀并回收。
t.setDaemon(True)
# python中的append()⽅法⽤于在列表末尾添加新的对象。
Threads.append(t)
for t in Threads:
# 开始线程活动。
# 对每⼀个线程对象来说它只能被调⽤⼀次,它安排对象在⼀个另外的单独线程中调⽤run()⽅法(⽽⾮当前所处线程)。
# 当该⽅法在同⼀个线程对象中被调⽤超过⼀次时,会引⼊RuntimeError(运⾏时错误)。
t.start()
for t in Threads:
t.join()
if __name__=="__main__":
t()
五、删除学⽣。
1、设计思路
1、先查询数据库中有多少个需要删除的数据,作为循环次数,特殊值+i作为学号传到创建的线程⾥⾯
2、再根据学号查学⽣的id,查到id后去请求删除接⼝
2、代码实现如下
# 并发测试框架//删除学⽣
import pymysql
import requests,threading
# 接⼝路径和token
data ={
"url":"。。。/base/delete",
"header":{
"X-Access-Token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9"
},
}
# 每个线程中循环测试接⼝的次数
ONE_WORKER_NUM =1
# 测试代码
def test(username):
# 根据username查询id
conn = t(
host ="。。。",
port =3306,
user ="root",
user ="root",
passwd ="。。。",
db="jeecg"
)
cur = conn.cursor()
sql ="SELECT id FROM mes_stu WHERE num ="+username
# print(sql)
resultSql = cur.fetchone()
# 获取试题接⼝
# 参数在请求体中时,使⽤bodys
# bodys = {
# # 请求体参数
# "userId": resultSql[0]
# }
# response = (data["url"]+'?userId='+resultSql[0], json=bodys, headers=data["header"]) print(data["url"]+'?userId='+resultSql[0])
response = requests.delete(data["url"]+'?id='+resultSql[0], headers=data["header"])
if response.status_code ==200:
result = t.decode('utf-8')
else:
result ="访问失败"
print(threading.current_thread(), result)
# 单个线程,可以设置⼀个线程访问⼏次接⼝
def working(username):
global ONE_WORKER_NUM
for i in range(0, ONE_WORKER_NUM):
test(username)
def t():
# 查询符合条件的数据总数
conn = t(
host ="。。。",
port =3306,
user ="root",
passwd ="。。。",
db="jeecg"
)
cur = conn.cursor()
sql ="SELECT COUNT(*) FROM mes_stu WHERE num LIKE '11111%' "
# 打印SQL
# print(sql)
result = cur.fetchone()
# 打印循环次数
print(result[0])
Threads =[]
for i in range(result[0]):
# 根据学号规律拼接
username ="""'%s'"""%(111111000+i)
t = threading.Thread(target=working, args=(username,))
t.setDaemon(True)
Threads.append(t)
for t in Threads:
python新手代码useridt.start()
for t in Threads:
t.join()
if __name__=="__main__":
t()
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论