python_接⼝⾃动化测试框架
本⽂总结分享介绍接⼝测试框架开发,环境使⽤python3+selenium3+unittest+ddt+requests测试框架及ddt数据驱动,采⽤Excel管理测试⽤例等集成测试数据功能,以及使⽤HTMLTestRunner来⽣成测试报告,⽬前有开源的poman、Jmeter等接⼝测试⼯具,为什么还要开发接⼝测试框架呢?因接⼝测试⼯具也有存在⼏点不⾜。
测试数据不可控制。⽐如接⼝返回数据不可控,就⽆法⾃动断⾔接⼝返回的数据,不能断定是接⼝程序引起,还是测试数据变化引起的错误,所以需要做⼀些初始化测试数据。接⼝⼯具没有具备初始化测试数据功能,⽆法做到真正的接⼝测试⾃动化。
⽆法测试加密接⼝。实际项⽬中,多数接⼝不是可以随便调⽤,⼀般情况⽆法摸拟和⽣成加密算法。如时间戳和MDB加密算法,⼀般接⼝⼯具⽆法摸拟。
扩展能⼒不⾜。开源的接⼝测试⼯具⽆法实现扩展功能。⽐如,我们想⽣成不同格式的测试报告,想将测试报告发送到指定邮箱,⼜想让接⼝测试集成到CI中,做持续集成定时任务。测试框架处理流程
测试框架处理过程如下:
1. ⾸先初始化清空数据库表的数据,向数据库插⼊测试数据;
2. 调⽤被测试系统提供的接⼝,先数据驱动读取excel⽤例⼀⾏数据;
3. 发送请求数据,根据传参数据,向数据库查询得到对应的数据;
4. 将查询的结果组装成JSON格式的数据,同时根据返回的数据值与Excel的值对⽐判断,并写⼊结果⾄指定Excel测试⽤例表格;
5. 通过单元测试框架断⾔接⼝返回的数据,并⽣成测试报告,最后把⽣成最新的测试报告HTML⽂件发送指定的邮箱。
测试框架结构⽬录介绍
⽬录结构介绍如下:
config/: ⽂件路径配置
database/: 测试⽤例模板⽂件及数据库和发送邮箱配置⽂件
db_fixture/: 初始化接⼝测试数据
lib/: 程序核⼼模块。包含有excel解析读写、发送邮箱、发送请求、⽣成最新测试报告⽂件
package/: 存放第三⽅库包。如HTMLTestRunner,⽤于⽣成HTML格式测试报告
report/: ⽣成接⼝⾃动化测试报告
testcase/: ⽤于编写接⼝⾃动化测试⽤例
run_demo.py: 执⾏所有接⼝测试⽤例的主程序
GitHub项⽬地址:
数据库封装
1 [tester]
2 name = Jason
3
4 [mysqlconf]
5 host = 127.0.0.1
6 port = 3306
7 user = root
8 password = 123456
9 db_name = guest
10
11 [user]
12# 发送邮箱服务器
13 HOST_SERVER = smtp.163
14# 邮件发件⼈
15 FROM = 111@163
16# 邮件收件⼈
17 TO = 222@126
18# 发送邮箱⽤户名/密码
19 user = aaa
20 password = aaa
21# 邮件主题
windows10运行的快捷键
22 SUBJECT = 发布会系统接⼝⾃动化测试报告
config.ini
3__author__ = 'YinJia'
4
5import os,sys
6 sys.path.append(os.path.dirname(os.path.dirname(__file__)))
7from config import setting
8from pymysql import connect,cursors
import OperationalError
10import configparser as cparser
11
12# --------- 读取config.ini配置⽂件 ---------------
13 cf = cparser.ConfigParser()
ad(setting.TEST_CONFIG,encoding='UTF-8')
15 host = cf.get("mysqlconf","host")
16 port = cf.get("mysqlconf","port")
17 user = cf.get("mysqlconf","user")
18 password = cf.get("mysqlconf","password")
19 db = cf.get("mysqlconf","db_name")
20
21class DB:
22"""
23 MySQL基本操作
24"""
25def__init__(self):
26try:
27# 连接数据库
28 = connect(host = host,
29 user = user,
30 password = password,
31 db = db,
32 charset = 'utf8mb4',
33 cursorclass = cursors.DictCursor
34 )
35except OperationalError as e:
36print("Mysql Error %d: %s" % (e.args[0],e.args[1]))
37
38# 清除表数据
39def clear(self,table_name):
40 real_sql = "delete from " + table_name + ";"
41 ursor() as cursor:
42# 取消表的外键约束
43 ute("SET FOREIGN_KEY_CHECKS=0;")
44 ute(real_sql)
45 it()
46
47# 插⼊表数据
48def insert(self, table_name, table_data):
49for key in table_data:
50 table_data[key] = "'"+str(table_data[key])+"'"
51 key = ','.join(table_data.keys())
52 value = ','.join(table_data.values())
53 real_sql = "INSERT INTO " + table_name + " (" + key + ") VALUES (" + value + ")"
54
55 ursor() as cursor:
56 ute(real_sql)
57 it()
58
59# 关闭数据库
60def close(self):
61 lose()
62
63# 初始化数据
64def init_data(self, datas):
65for table, data in datas.items():
66 self.clear(table)
67for d in data:
68 self.insert(table, d)
69 self.close()
mysql_db.py
1#!/usr/bin/env python
2# _*_ coding:utf-8 _*_
3__author__ = 'YinJia'
4
5import sys, time, os
6 sys.path.append(os.path.dirname(os.path.dirname(__file__)))
7from sql_db import DB
8
9# 定义过去时间
10 past_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()-100000))
11# 定义将来时间
12 future_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()+10000))
13
14# 创建测试数据
15 datas = {
16# 发布会表数据
17'sign_event':[
18 {'id':1,'name':'红⽶Pro发布会','`limit`':2000,'status':1,'address':'北京会展中⼼','start_time':future_time},
19 {'id':2,'name':'苹果iphon6发布会','`limit`':1000,'status':1,'address':'宝安体育馆','start_time':future_time},
20 {'id':3,'name':'华为荣耀8发布会','`limit`':2000,'status':0,'address':'深圳福⽥会展中⼼','start_time':future_time},
21 {'id':4,'name':'苹果iphon8发布会','`limit`':2000,'status':1,'address':'深圳湾体育中⼼','start_time':past_time},
22 {'id':5,'name':'⼩⽶5发布会','`limit`':2000,'status':1,'address':'北京国家会议中⼼','start_time':future_time},
23 ],
26 {'id':1,'realname':'Tom','phone':135********,'email':'alen@mail','sign':0,'event_id':1},
27 {'id':2,'realname':'Jason','phone':135********,'email':'sign@mail','sign':1,'event_id':1},
28 {'id':3,'realname':'Jams','phone':135********,'email':'tom@mail','sign':0,'event_id':5},
29 ],
30 }
31
32# 测试数据插⼊表
33def init_data():
34 DB().init_data(datas)
test_data.py
1#!/usr/bin/env python
2# _*_ coding:utf-8 _*_
3__author__ = 'YinJia'
4
5import os,sys
6 BASE_DIR = os.path.dirname(os.path.dirname(__file__))
7 sys.path.append(BASE_DIR)
8
9# 配置⽂件
10 TEST_CONFIG = os.path.join(BASE_DIR,"database","config.ini")
11# 测试⽤例模板⽂件
12 SOURCE_FILE = os.path.join(BASE_DIR,"database","DemoAPITestCase.xlsx")
13# excel测试⽤例结果⽂件
14 TARGET_FILE = os.path.join(BASE_DIR,"report","excelReport","DemoAPITestCase.xlsx")
15# 测试⽤例报告
16 TEST_REPORT = os.path.join(BASE_DIR,"report")
17# 测试⽤例程序⽂件
18 TEST_CASE = os.path.join(BASE_DIR,"testcase")
setting.py
程序核⼼模块
1#!/usr/bin/env python
2# _*_ coding:utf-8 _*_
3__author__ = 'YinJia'
4
5import os
6
7def new_report(testreport):
8"""
9⽣成最新的测试报告⽂件
10 :param testreport:
11 :return:返回⽂件
12"""
13 lists = os.listdir(testreport)
14 lists.sort(key=lambda fn: ime(testreport + "\\" + fn))
15 file_new = os.path.join(testreport,lists[-1])
16return file_new
netReport.py
1#!/usr/bin/env python
2# _*_ coding:utf-8 _*_
3__author__ = 'YinJia'
4
5import xlrdstruct node next什么意思
6
7class ReadExcel():
8"""读取excel⽂件数据"""二维数组的行列怎么表示
9def__init__(self,fileName, SheetName="Sheet1"):
10 self.data = xlrd.open_workbook(fileName)
11 self.table = self.data.sheet_by_name(SheetName)
12
13# 获取总⾏数、总列数
14 ws = ws
15 ls = ls
python请求并解析json数据16def read_data(self):
ws > 1:
18# 获取第⼀⾏的内容,列表格式
19 keys = w_values(0)
20 listApiData = []
21# 获取每⼀⾏的内容,列表格式
22for col in range(1, ws):
23 values = w_values(col)
24# keys,values组合转换为字典
25 api_dict = dict(zip(keys, values))
26 listApiData.append(api_dict)
27return listApiData
28else:
29print("表格是空数据!")
30return None
readexcel.py
1#!/usr/bin/env python
2# _*_ coding:utf-8 _*_
3__author__ = 'YinJia'
4
6 sys.path.append(os.path.dirname(os.path.dirname(__file__)))
7
8
9class SendRequests():
10"""发送请求数据"""
11def sendRequests(self,s,apiData):
12try:
13#从读取的表格中获取响应的参数作为传递
14 method = apiData["method"]
15 url = apiData["url"]
16if apiData["params"] == "":
17 par = None
18else:
19 par = eval(apiData["params"])
20if apiData["headers"] == "":
21 h = None
22else:
23 h = eval(apiData["headers"])
24if apiData["body"] == "":
25 body_data = None
矿物质属于常量元素的是26else:
27 body_data = eval(apiData["body"])
28 type = apiData["type"]
29 v = False
30if type == "data":
31 body = body_data
32elif type == "json":
33 body = json.dumps(body_data)
34else:
textarea的cols和rows35 body = body_data
36
37#发送请求
38 re = s.request(method=method,url=url,headers=h,params=par,data=body,verify=v) 39return re
40except Exception as e:
41print(e)
sendrequests.py
1#!/usr/bin/env python
2# _*_ coding:utf-8 _*_
3__author__ = 'YinJia'
4
5import os,sys
6 sys.path.append(os.path.dirname(os.path.dirname(__file__)))
7from config import setting
8import smtplib
wReport import new_report
10import configparser
11from import MIMEText
12from email.mime.multipart import MIMEMultipart
13
14
15def send_mail(file_new):
16"""
17定义发送邮件
18 :param file_new:
19 :return: 成功:打印发送邮箱成功;失败:返回失败信息
20"""
21 f = open(file_new,'rb')
22 mail_body = f.read()
23 f.close()
24#发送附件
25 con = configparser.ConfigParser()
26 ad(setting.TEST_CONFIG,encoding='utf-8')
27 report = new_report(setting.TEST_REPORT)
28 sendfile = open(report,'rb').read()
29# --------- 读取config.ini配置⽂件 ---------------
30 HOST = ("user","HOST_SERVER")
31 SENDER = ("user","FROM")
32 RECEIVER = ("user","TO")
33 USER = ("user","user")
34 PWD = ("user","password")
35 SUBJECT = ("user","SUBJECT")
36
37 att = MIMEText(sendfile,'base64','utf-8')
38 att["Content-Type"] = 'application/octet-stream'
39 att.add_header("Content-Disposition", "attachment", filename=("gbk", "", report))
40
41 msg = MIMEMultipart('related')
42 msg.attach(att)
43 msgtext = MIMEText(mail_body,'html','utf-8')
44 msg.attach(msgtext)
45 msg['Subject'] = SUBJECT
46 msg['from'] = SENDER
47 msg['to'] = RECEIVER
48
49try:
50 server = smtplib.SMTP()
51 t(HOST)
52 server.starttls()
53 server.login(USER,PWD)
54 server.sendmail(SENDER,RECEIVER,msg.as_string())
55 server.quit()
57except Exception as e:
58print("失败: " + str(e))
sendmail.py
1#!/usr/bin/env python
2# _*_ coding:utf-8 _*_
3__author__ = 'YinJia'
4
5import os,sys
6 sys.path.append(os.path.dirname(os.path.dirname(__file__)))
7import shutil
8from config import setting
9from openpyxl import load_workbook
10from openpyxl.styles import Font,Alignment
11from lors import RED,GREEN,DARKYELLOW 12import configparser as cparser
13
14# --------- 读取config.ini配置⽂件 ---------------
15 cf = cparser.ConfigParser()
ad(setting.TEST_CONFIG,encoding='UTF-8')
17 name = cf.get("tester","name")
18
19class WriteExcel():
20"""⽂件写⼊数据"""
21def__init__(self,fileName):
22 self.filename = fileName
23if not ists(self.filename):
24# ⽂件不存在,则拷贝模板⽂件⾄指定报告⽬录下
25 pyfile(setting.SOURCE_FILE,setting.TARGET_FILE)
26 self.wb = load_workbook(self.filename)
27 self.ws = self.wb.active
28
29def write_data(self,row_n,value):
30"""
31写⼊测试结果
32 :param row_n:数据所在⾏数
33 :param value: 测试结果值
34 :return: ⽆
35"""
36 font_GREEN = Font(name='宋体', color=GREEN, bold=True)
37 font_RED = Font(name='宋体', color=RED, bold=True)
38 font1 = Font(name='宋体', color=DARKYELLOW, bold=True)
39 align = Alignment(horizontal='center', vertical='center')
40# 获数所在⾏数
41 L_n = "L" + str(row_n)
42 M_n = "M" + str(row_n)
43if value == "PASS":
44 ll(row_n, 12, value)
45 self.ws[L_n].font = font_GREEN
46if value == "FAIL":
47 ll(row_n, 12, value)
48 self.ws[L_n].font = font_RED
49 ll(row_n, 13, name)
50 self.ws[L_n].alignment = align
51 self.ws[M_n].font = font1
52 self.ws[M_n].alignment = align
53 self.wb.save(self.filename)
writeexcel.py
接⼝测试⽤例编写
1#!/usr/bin/env python
2# _*_ coding:utf-8 _*_
3__author__ = 'YinJia'
4
5import os,sys
6 sys.path.append(os.path.dirname(os.path.dirname(__file__)))
7import unittest,requests,ddt
8from config import setting
adexcel import ReadExcel
10from lib.sendrequests import SendRequests
11from lib.writeexcel import WriteExcel
12
13 testData = ReadExcel(setting.SOURCE_FILE, "Sheet1").read_data() 14
15 @ddt.ddt
16class Demo_API(unittest.TestCase):
17"""发布会系统"""
18def setUp(self):
19 self.s = requests.session()
20
21def tearDown(self):
22pass
23
24 @ddt.data(*testData)
25def test_api(self,data):
26# 获取ID字段数值,截取结尾数字并去掉开头0
27 rowNum = int(data['ID'].split("_")[2])
28# 发送请求
29 re = SendRequests().sendRequests(self.s,data)
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论