python实战——针对抽奖系统奖池数据概率的计算
⼀、需求背景
抽奖活动统计奖池中各个奖项的发放概率
⼆、逻辑拆分
1、取当前时间整点时间在数据库中对应的poolid
#获取当前时间戳
hour_stamp = w().replace(minute=0, second=0, microsecond=0).timestamp()) #只取当前时间段的整点时间,向上取整,分钟与秒均替换成00 print("当前时间戳:", hour_stamp)
#查询pooId
def select_pooId():
sql_response = ute("SELECT id FROM draw_pool WHERE startHour = %s" %hour_stamp)
repare = cursor.fetchone()
if repare == None:
print("数据库数据未刷新,获取不到数据,请联系狗逼开发")
else:
ren = repare[0]
print("当前时间段对应的pooid为:", ren)
return ren
pooId = select_pooId()
2、清空⽤户id对应残留数据,保证数据准确性
sql_format = ute("DELETE FROM card_user_record WHERE userId = %s" %userId)
cnnmit()
print("数据表格式化成功")
3、批量运⾏接⼝1000次,保证缩⼩概率偏差
#批量运⾏定时红包接⼝
num = 1000
Pointer = 0
while running:
if Pointer == num:
running = False
print("已成功运⾏%s次" %Pointer)
else:
Interface_response = Interface()
Pointer = Pointer + 1
print(Interface_response)
4、加⼊等待时间,基于系统的抗压能⼒保证接⼝运⾏成功率
time.sleep(1)
sys.stdout.flush()  #强制刷新缓存区
5、接⼝运⾏结束后,对表中数据进⾏分组,分类查询,计算出各个奖励的数量
#循环查询sql并打印
base_list = (1,2,3,4,5,6,7,8,9,10)  #对应奖励类型,1 cpa.cpl 通⽤  2 cpa 通⽤  3 cpa试玩  4 cpa 截图  5 cpa 答题  6 cpa ASO  7 所有cpl任务  8  cpl 所有充值规则for base in base_list:
sql_values = ute(sql.format(userId,base)) #引⼊全局sql,拼接变量进⾏传参
repare = cursor.fetchall()
result_list = list(chain.from_iterable(repare))  #游标输出内容为⼆维元组,需将⼆维元组转换为⼀维元组后计算
print(result_list[0],end=',')
6、最后基于(各个奖励数量/发放总奖励数量*100)得出各个奖励出现的概率
#计算概率
for probability in result_list:
calculation = ('persent:{:.2f}%'.format(probability/result_all*100))
print(calculation,end=',  ')
三、代码实现
# coding=UTF-8
tor
import requests,json
from datetime import datetime
import time
import sys
running = True
userId = 10002142
sql = "SELECT COUNT(id) FROM card_user_record WHERE userId = {0} AND useLimit = {1}"
from itertools import chain
#获取当前时间戳
hour_stamp = w().replace(minute=0, second=0, microsecond=0).timestamp()) #只取当前时间段的整点时间,向上取整,分钟与秒均替换成00
print("当前时间戳:", hour_stamp)
#连接数据库
config = {
'host':'47.123.22.33',
'user':'ne123t',
'password':'Ne123ED1',
'port':'3306',
'database':'c123nty',
'charset':'utf8'
}
#检查数据库状态
try:
cnn = t(**config)
print("数据库连接成功")
tor.Error as e:
print("数据库连接失败")
cursor = cnn.cursor(buffered=True)
#查询pooId
def select_pooId():
sql_response = ute("SELECT id FROM draw_pool WHERE startHour = %s" %hour_stamp)
repare = cursor.fetchone()
if repare == None:
print("数据库数据未刷新,获取不到数据,请联系狗逼开发")
else:
ren = repare[0]
print("当前时间段对应的pooid为:", ren)
python新手代码userid
return ren
pooId = select_pooId()
#清空数据表
#清空数据表
sql_format = ute("DELETE FROM card_user_record WHERE userId = %s" %userId)
cnnmit()
print("数据表格式化成功")
#定时红包接⼝
def Interface():
github_url = "crazyapitest.zhuoyixia/v1/draw/rand-reward"
header = {
'Content-Type':'application/json',
'mk':'o8293nHdiwnjNjksnv',
'isObj':'1',
'platform':'2',
'system':'{"uuid":"70494D22-3822-4BC8-A6EF-F5131F7737F3","ymId":"98d5e2d594bbe5f95ee9d437cc4fd2","channelId":"3","joinId":"1","idfa":"094FA6BC-4    data = {
"poolId":pooId
}
data_json = json.dumps(data)
r = requests.post(github_url,headers = header,data=data_json)
return r.json()
# Interface_response = Interface()
#批量运⾏定时红包接⼝
num = 1000
Pointer = 0
while running:
if Pointer == num:
running = False
print("已成功运⾏%s次" %Pointer)
else:
Interface_response = Interface()
Pointer = Pointer + 1
print(Interface_response)
time.sleep(1)
sys.stdout.flush()  #强制刷新缓存区
#循环查询sql并打印
base_list = (1,2,3,4,5,6,7,8,9,10)  #对应奖励类型,1 cpa.cpl 通⽤  2 cpa 通⽤  3 cpa试玩  4 cpa 截图  5 cpa 答题  6 cpa ASO  7 所有cpl任务  8  cpl 所有充值规则for base in base_list:
sql_values = ute(sql.format(userId,base)) #引⼊全局sql,拼接变量进⾏传参
repare = cursor.fetchall()
result_list = list(chain.from_iterable(repare))  #游标输出内容为⼆维元组,需将⼆维元组转换为⼀维元组后计算
print(result_list[0],end=',')
#sql查询
def all():
cpa_aso = ute("SELECT COUNT(id) FROM card_user_record WHERE userId = %s" % userId)
repare = cursor.fetchone()  #fetchone只取⼀条结果
return repare[0]
result_all = all()
print("发放总数量:",result_all)
cursor.close()
cnn.close()
#计算概率
for probability in result_list:
calculation = ('persent:{:.2f}%'.format(probability/result_all*100))
print(calculation,end=',  ')
四、技能学习
1、fetch函数⽤法
fetchone()的⽤法
# 包含选中元素(name, friends)的第⼀⾏,以tuple形式存在:
row = cur.fetchone()
# row[0]代表name
# row[1]代表friends
fetchall()的⽤法
# 包含选中元素(name, friends)的所有⾏,以list(tuple, tuple, ...)形式存在:
allrows = cur.fetchall()
# allrows[i]代表“第i⾏的tuple”
# allrows[i][0]代表“第i⾏的tuple的name”
# allrows[i][1]代表“第i⾏的tuple的friends”
fetchmany(size=cursor.arraysize)的⽤法
类似fetchall()。不同处在于使⽤ size 参数限制了每次fetch到的row的数量
2、百分⽐计算
【⽅式1 】
格式化为float ,然后处理成%格式,需要对分⼦/分母 * 100如下:
percentList.append('{:.2f}%'.format(member/denominator*100))
【⽅式2 】
直接使⽤参数格式化:{:.2%} {:.2%}:显⽰⼩数点后2位
【说明】
{ } 的意思是对应format()的⼀个参数,按默认顺序对应,参数序号从0开始,{0}对应format()的第⼀个参数,{1}对应第⼆个参数。
3、python强制刷新缓存区
Python ⽂件 flush() ⽅法是⽤来把⽂件从内存buffer(缓冲区)中强制刷新到硬盘中,同时清空缓冲区。
⼀般情况下,⽂件关闭后会⾃动刷新到硬盘中,但有时你需要在关闭前刷新到硬盘中,这时就可以使⽤ flush() ⽅法。
flush() ⽅法语法如下:
fileObject.flush()
4、将 pymysql 返回的⼆维元组数据转换为列表
from itertools import chain  # 将⼆维元组转换为⼀维元组
data=(("123","234"),("345","456"))
print(list(chain.from_iterable(test)))

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。