python阿⾥云oss实现直传签名与回调验证的⽰例⽅法签名
import base64
import json
import time
from datetime import datetime
import hmac
from hashlib import sha1
access_key_id = ''
# 请填写您的AccessKeySecret。
access_key_secret = ''
# host的格式为 dpoint ,请替换为您的真实信息。
host = ''
# callback_url为上传回调服务器的URL,请将下⾯的IP和Port配置为您⾃⼰的真实信息。
callback_url = ""
# ⽤户上传⽂件时指定的前缀。
upload_dir = 'user-dir-prefix/'
expire_time = 1200
expire_syncpoint = int(time.time() + expire_time)
policy_dict = {
'expiration': datetime.utcfromtimestamp(expire_syncpoint).isoformat() + 'Z',
'conditions': [
{"bucket": "test-paige"},
['starts-with', '$key', 'user/test/']
]
}
policy = json.dumps(policy_dict).strip()
policy_encode = base64.de())
signature = w(access_de(), policy_encode, sha1).digest())
callback_dict = {
'callbackUrl': callback_url,
'callbackBody': 'filename=${object}&size=${size}&mimeType=${mimeType}&height=${imageInfo.height}&width=${'
'imageInfo.width}',
'callbackBodyType': 'application/json'
}
callback = base64.b64encode(json.dumps(callback_dict).strip().encode()).decode()
var = {
'accessid': access_key_id,
'host': host,
'policy': policy_encode.decode(),
'signature': signature.decode().strip(),
'expire': expire_syncpoint,
'callback': callback
}
回调验签
import asyncio
import base64
import time
import aiomysql
import rsa
from aiohttp import web, ClientSession
from urllib import parse
import uuid
def success(msg='', data=None):
if data is None:
data = {}
dict_data = {
'code': 1,
'msg': msg,
'data': data
}
签名字符串是什么
return web.json_response(dict_data)
def failed(msg='', data=None):
if data is None:
data = {}
dict_data = {
'code': 0,
'msg': msg,
'data': data
}
return web.json_response(dict_data)
async def handle(request):
"""
获取连接池
:param web.BaseRequest request:
:return:
"""
authorization_base64 = request.headers['authorization']
x_oss_pub_key_url_base64 = request.headers['x-oss-pub-key-url']
pub_key_url = base64.b64decode(x_oss_pub_key_de())
authorization = base64.b64decode(de())
path = request.path
async with ClientSession() as session:
async (pub_key_url.decode()) as resp:
pub_key_body = ()
pubkey = rsa.PublicKey.load_pkcs1_openssl_pem(pub_de())
body = t.read()
auth_str = parse.unquote(path) + '\n' + body.decode()
parse_url = parse.parse_qs(body.decode())
print(parse_url)
try:
rsa.verify(de(), authorization, pubkey)
pool = request.app['mysql_pool']
async with pool.acquire() as conn:
async with conn.cursor() as cur:
id = str(uuid.uuid4())
url = parse_url['filename'][0]
mime = parse_url['mimeType'][0]
disk = 'oss'
time_at = time.strftime("%Y-%m-%d %H:%I:%S", time.localtime())
sql = "INSERT INTO media(id,url,mime,disk,created_at,updated_at) VALUES(%s,%s,%s,%s,%s,%s)"
ute(sql, (id, url, mime, disk, time_at, time_at))
await connmit()
dict_data = {
'id': id,
'url': url,
'cdn_url': 'cdn.***' + '/' + url,
'mime': mime,
'disk': disk,
'created_at': time_at,
'updated_at': time_at,
}
return success(data=dict_data)
except rsa.pkcs1.VerificationError:
return failed(msg='验证错误')
async def init(loop):
# 创建连接池
mysql_pool = ate_pool(host='127.0.0.1', port=3306,
user='', password='',
db='', loop=loop)
async def on_shutdown(application):
"""
接收到关闭信号时,要先关闭连接池,并等待连接池关闭成功.
:param web.Application application:
:return:
"""
application['mysql_pool'].close()
# 没有下⾯这句话会报错 RuntimeError: Event loop is closed ,因为连接池没有真正关关闭程序就关闭了,引发python的报错    await application['mysql_pool'].wait_closed()
application = web.Application()
<_shutdown.append(on_shutdown)
# 把连接池放到 application 实例中
application['mysql_pool'] = mysql_pool
application.add_routes([('/', handle), web.post('/oss', handle)])
return application
if __name__ == '__main__':
loop = _event_loop()
application = loop.run_until_complete(init(loop))
web.run_app(application, host='127.0.0.1')
loop.close()
到此这篇关于python 阿⾥云oss实现直传签名与回调验证的⽂章就介绍到这了,更多相关python 直传签名与回调验证内容请搜索以前的⽂章或继续浏览下⾯的相关⽂章希望⼤家以后多多⽀持!

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。