python3实现公众平台(和企业)消息被动回
复以及加解密
公众平台消息加密被动回复以及加解密,官⽅提供的是python2⽰例代码:
现将上述代码修改成python3版本,且⽀持中⽂编码消息:
⼀、:
WXBizMsgCrypt.py
#!/usr/bin/env python
# -*- encoding:utf-8 -*-
"""
python3对公众平台发送给公众账号的消息加解密代码.⽀持中⽂.
"""
# ------------------------------------------------------------------------
import base64
import string
import random
import hashlib
import time
import struct
import binascii
from Crypto.Cipher import AES
cElementTree as ET
import socket
from wx_encry import ierror
""" AES加解密⽤ pycrypto """
class FormatException(Exception):
pass
def throw_exception(message, exception_class=FormatException):
"""my define raise exception function"""
raise exception_class(message)
class SHA1:
"""计算公众平台的消息签名接⼝"""
def getSHA1(self, token, timestamp, nonce, encrypt):
"""⽤SHA1算法⽣成安全签名
@param token: 票据
@param timestamp: 时间戳
@param encrypt: 密⽂
@param nonce: 随机字符串
@return: 安全签名
"""
try:
token = token.decode()
sortlist = [token, timestamp, nonce, encrypt]
sortlist.sort()
sha = hashlib.sha1()
sha.update("".join(sortlist).encode("utf8"))
return ierror.WXBizMsgCrypt_OK, sha.hexdigest()
return ierror.WXBizMsgCrypt_OK, sha.hexdigest()
except Exception as e:
print(e)
return ierror.WXBizMsgCrypt_ComputeSignature_Error, None
class XMLParse(object):
"""提供提取消息格式中的密⽂及⽣成回复消息格式的接⼝"""
# xml消息模板
AES_TEXT_RESPONSE_TEMPLATE = """<xml>
<Encrypt><![CDATA[%(msg_encrypt)s]]></Encrypt>
<MsgSignature><![CDATA[%(msg_signaturet)s]]></MsgSignature> <TimeStamp>%(timestamp)s</TimeStamp>
<Nonce><![CDATA[%(nonce)s]]></Nonce>
</xml>"""
def extract(self, xmltext):
"""提取出xml数据包中的加密消息
@param xmltext: 待提取的xml字符串
@return: 提取出的加密消息字符串
"""
try:
xml_tree = ET.fromstring(xmltext)
encrypt = xml_tree.find("Encrypt")
touser_name = xml_tree.find("ToUserName")
return ierror.WXBizMsgCrypt_OK, , except Exception as e:
print(e)
return ierror.WXBizMsgCrypt_ParseXml_Error, None, None
def generate(self, encrypt, signature, timestamp, nonce):
"""⽣成xml消息
@param encrypt: 加密后的消息密⽂
@param signature: 安全签名
@param timestamp: 时间戳
@param nonce: 随机字符串
@return: ⽣成的xml字符串
error parse new"""
resp_dict = {
'msg_encrypt': encrypt,
'msg_signaturet': signature,
'timestamp': timestamp,
'nonce': nonce,
}
resp_xml = self.AES_TEXT_RESPONSE_TEMPLATE % resp_dict return resp_xml
class PKCS7Encoder(object):
"""提供基于PKCS7算法的加解密接⼝"""
block_size = 32
def encode(self, text):
""" 对需要加密的明⽂进⾏填充补位
@param text: 需要进⾏填充补位操作的明⽂
@return: 补齐明⽂字符串
"""
text_length = len(text)
# 计算需要填充的位数
amount_to_pad = self.block_size - (text_length % self.block_size) if amount_to_pad == 0:
amount_to_pad = self.block_size
# 获得补位所⽤的字符
pad = chr(amount_to_pad).encode()
pad = chr(amount_to_pad).encode()
return text + pad * amount_to_pad
def decode(self, decrypted):
"""删除解密后明⽂的补位字符
@param decrypted: 解密后的明⽂
@return: 删除补位字符后的明⽂
"""
pad = ord(decrypted[-1])
if pad < 1 or pad > 32:
pad = 0
return decrypted[:-pad]
class Prpcrypt(object):
"""提供接收和推送给公众平台消息的加解密接⼝"""
def __init__(self, key):
# self.key = base64.b64decode(key+"=")
self.key = key
# 设置加解密模式为AES的CBC模式
def encrypt(self, text, appid):
"""对明⽂进⾏加密
@param text: 需要加密的明⽂
@return: 加密得到的字符串
"""
# 16位随机字符串添加到明⽂开头
len_str = struct.pack("I", socket.htonl(de())))
# text = _random_str() + binascii.b2a_hex(len_str).decode() + text + appid text = _random_str() + len_str + de() + appid
# 使⽤⾃定义的填充⽅式对明⽂进⾏补位填充
pkcs7 = PKCS7Encoder()
text = de(text)
# 加密
cryptor = w(self.key, de, self.key[:16])
try:
ciphertext = pt(text)
# 使⽤BASE64对加密后的字符串进⾏编码
return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext).decode('utf8') except Exception as e:
return ierror.WXBizMsgCrypt_EncryptAES_Error, None
def decrypt(self, text, appid):
"""对解密后的明⽂进⾏补位删除
@param text: 密⽂
@return: 删除填充补位后的明⽂
"""
try:
cryptor = w(self.key, de, self.key[:16])
# 使⽤BASE64对密⽂进⾏解码,然后AES-CBC解密
plain_text = cryptor.decrypt(base64.b64decode(text))
except Exception as e:
print(e)
return ierror.WXBizMsgCrypt_DecryptAES_Error, None
try:
# pad = ord(plain_text[-1])
pad = plain_text[-1]
# 去掉补位字符串
# pkcs7 = PKCS7Encoder()
# plain_text = de(plain_text)
# 去除16位随机字符串
content = plain_text[16:-pad]
xml_len = hl(struct.unpack("I", content[: 4])[0])
xml_content = content[4: xml_len + 4]
xml_content = content[4: xml_len + 4]
from_appid = content[xml_len + 4:]
except Exception as e:
return ierror.WXBizMsgCrypt_IllegalBuffer, None
if from_appid != appid:
return ierror.WXBizMsgCrypt_ValidateAppid_Error, None
return 0, xml_content.decode()
def get_random_str(self):
""" 随机⽣成16位字符串
@return: 16位字符串
"""
rule = string.ascii_letters + string.digits
str = random.sample(rule, 16)
return "".join(str).encode()
class WXBizMsgCrypt(object):
# 构造函数
# @param sToken: 公众平台上,开发者设置的Token
# @param sEncodingAESKey: 公众平台上,开发者设置的EncodingAESKey
# @param sAppId: 企业号的AppId
def __init__(self, sToken, sEncodingAESKey, sAppId):
try:
self.key = base64.b64decode(sEncodingAESKey + "=")
assert len(self.key) == 32
except Exception:
throw_exception("[error]: EncodingAESKey unvalid !", FormatException)
# return ierror.WXBizMsgCrypt_IllegalAesKey)
self.appid = de()
def EncryptMsg(self, sReplyMsg, sNonce, timestamp=None):
# 将回复⽤户的消息加密打包
# @param sReplyMsg: 企业号待回复⽤户的消息,xml格式的字符串
# @param sTimeStamp: 时间戳,可以⾃⼰⽣成,也可以⽤URL参数的timestamp,如为None则⾃动⽤当前时间
# @param sNonce: 随机串,可以⾃⼰⽣成,也可以⽤URL参数的nonce
# sEncryptMsg: 加密后的可以直接回复⽤户的密⽂,包括msg_signature, timestamp, nonce, encrypt的xml格式的字符串, # return:成功0,sEncryptMsg,失败返回对应的错误码None
pc = Prpcrypt(self.key)
ret, encrypt = pc.encrypt(sReplyMsg, self.appid)
if ret != 0:
return ret, None
if timestamp is None:
timestamp = str(int(time.time()))
# ⽣成安全签名
sha1 = SHA1()
ret, signature = ken, timestamp, sNonce, encrypt)
if ret != 0:
return ret, None
xmlParse = XMLParse()
return ret, ate(encrypt, signature, timestamp, sNonce)
def DecryptMsg(self, sPostData, sMsgSignature, sTimeStamp, sNonce):
# 检验消息的真实性,并且获取解密后的明⽂
# @param sMsgSignature: 签名串,对应URL参数的msg_signature
# @param sTimeStamp: 时间戳,对应URL参数的timestamp
# @param sNonce: 随机串,对应URL参数的nonce
# @param sPostData: 密⽂,对应POST请求的数据
# xml_content: 解密后的原⽂,当return返回0时有效
# @return: 成功0,失败返回对应的错误码
# 验证安全签名
xmlParse = XMLParse()
ret, encrypt, touser_name = act(sPostData)
if ret != 0:
if ret != 0:
return ret, None
sha1 = SHA1()
ret, signature = ken, sTimeStamp, sNonce, encrypt)
if ret != 0:
return ret, None
if not signature == sMsgSignature:
return ierror.WXBizMsgCrypt_ValidateSignature_Error, None
pc = Prpcrypt(self.key)
ret, xml_content = pc.decrypt(encrypt, self.appid)
return ret, xml_content
⼆、现主要针对企业的使⽤做说明:
WXBizMsgCrypt.py代码与上述的⼤致相同,有些许基于企业的参数改动:
""" python3对企业发送给企业后台的消息加解密,⽀持中⽂
"""
# ------------------------------------------------------------------------
import base64
import string
import random
import hashlib
import time
import struct
import binascii
from Crypto.Cipher import AES
cElementTree as ET
import socket
from .ierror import ierror
"""
关于Crypto.Cipher模块,ImportError: No module named 'Crypto'解决⽅案
请到官⽅⽹站 www.dlitz/software/pycrypto/ 下载pycrypto。
下载后,按照README中的“Installation”⼩节的提⽰进⾏pycrypto安装。
"""
class FormatException(Exception):
pass
def throw_exception(message, exception_class=FormatException):
"""my define raise exception function"""
raise exception_class(message)
class SHA1:
"""计算企业的消息签名接⼝"""
def getSHA1(self, token, timestamp, nonce, encrypt):
"""⽤SHA1算法⽣成安全签名
@param token: 票据
@param timestamp: 时间戳
@param encrypt: 密⽂
@param nonce: 随机字符串
@return: 安全签名
"""
try:
sortlist = [token, timestamp, nonce, encrypt]
sortlist.sort()
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论