基于web端的深度学习模型部署
1.1 web服务与技术框架
下⾯以ResNet50预训练模型为例,旨在展⽰⼀个轻量级的深度学习模型部署,写⼀个较为简单的图像分类的REST API。主要技术框架为Keras+Flask+Redis。其中Keras作为模型框架、Flask作为后端Web框架、Redis则是⽅便以键值形式存储图像的数据库。各主要package版本:
tensorflow 1.14
keras 2.2.4
flask 1.1.1
redis 3.3.8
先简单说⼀下Web服务,⼀个Web应⽤的本质⽆⾮就是客户端发送⼀个HTTP请求,然后服务器收到请求后⽣成⼀个HTML⽂档作为响应返回给客户端的过程。在部署深度学习模型时,⼤多时候我们不需要搞⼀个前端页⾯出来,⼀般是以REST API的形式提供给开发调⽤。那么什么是API呢?很简单,如果⼀个URL返回的不是HTML,⽽是机器能直接解析的数据,这样的⼀个URL就可以看作是⼀个API。
先开启Redis服务:
redis-server
1.2 服务配置
定义⼀些配置参数:
IMAGE_WIDTH = 224
IMAGE_HEIGHT = 224
IMAGE_CHANS = 3
IMAGE_DTYPE = "float32"
IMAGE_QUEUE = "image_queue"
BATCH_SIZE = 32
SERVER_SLEEP = 0.25
CLIENT_SLEEP = 0.25
指定输⼊图像⼤⼩、类型、batch_size⼤⼩以及Redis图像队列名称。
然后创建Flask对象实例,建⽴Redis数据库连接:
app = flask.Flask(__name__)
db = redis.StrictRedis(host="localhost", port=6379, db=0)
model = None
因为图像数据作为numpy数组不能直接存储到Redis中,所以图像存⼊到数据库之前需要将其序列化编码,从数据库取出时再将其反序列化解码即可。分别定义编码和解码函数:
def base64_encode_image(img):
return base64.b64encode(img).decode("utf-8")
def base64_decode_image(img, dtype, shape):
if sys.version_info.major == 3:
img = bytes(img, encoding="utf-8")
img = np.frombuffer(base64.decodebytes(img), dtype=dtype)
img = shape(shape)
return img
另外待预测图像还需要进⾏简单的预处理,定义预处理函数如下:
def prepare_image(image, target):
# if the image mode is not RGB, convert it
de != "RGB":
image = vert("RGB")
# resize the input image and preprocess it
image = size(target)
image = img_to_array(image)
# expand image as one batch like shape (1, c, w, h)
image = np.expand_dims(image, axis=0)
image = imagenet_utils.preprocess_input(image)
# return the processed image
return image
1.3 预测接⼝定义
准备⼯作完毕之后,接下来就是主要的两⼤部分:模型预测部分和app后端响应部分。先定义模型预测函数如下:
def classify_process():
# 导⼊模型
print("* ")
model = ResNet50(weights="imagenet")
print("* Model loaded")
while True:
# 从数据库中创建预测图像队列
queue = db.lrange(IMAGE_QUEUE, 0, BATCH_SIZE - 1)
imageIDs = []
batch = None
# 遍历队列
for q in queue:
# 获取队列中的图像并反序列化解码
q = json.loads(q.decode("utf-8"))
image = base64_decode_image(q["image"], IMAGE_DTYPE,
(1, IMAGE_HEIGHT, IMAGE_WIDTH, IMAGE_CHANS))            # 检查batch列表是否为空
if batch is None:
batch = image
# 合并batch
else:
batch = np.vstack([batch, image])
# 更新图像ID
imageIDs.append(q["id"])
web后端是指什么if len(imageIDs) > 0:
print("* Batch size: {}".format(batch.shape))
preds = model.predict(batch)
results = imagenet_utils.decode_predictions(preds)
# 遍历图像ID和预测结果并打印
for (imageID, resultSet) in zip(imageIDs, results):
# initialize the list of output predictions
output = []
# loop over the results and add them to the list of
# output predictions
for (imagenetID, label, prob) in resultSet:
r = {"label": label, "probability": float(prob)}
output.append(r)
# 保存结果到数据库
db.set(imageID, json.dumps(output))
# 从队列中删除已预测过的图像
db.ltrim(IMAGE_QUEUE, len(imageIDs), -1)
time.sleep(SERVER_SLEEP)
然后定义app服务:
@ute("/predict", methods=["POST"])
def predict():
# 初始化数据字典
data = {"success": False}
# 确保图像上传⽅式正确
hod == "POST":
("image"):
# 读取图像数据
image = quest.files["image"].read()
image = Image.open(io.BytesIO(image))
image = prepare_image(image, (IMAGE_WIDTH, IMAGE_HEIGHT))
# 将数组以C语⾔存储顺序存储
image = py(order="C")
# ⽣成图像ID
k = str(uuid.uuid4())
d = {"id": k, "image": base64_encode_image(image)}
db.rpush(IMAGE_QUEUE, json.dumps(d))
# 运⾏服务
while True:
# 获取输出结果
output = db.get(k)
if output is not None:
output = output.decode("utf-8")
data["predictions"] = json.loads(output)
db.delete(k)
break
time.sleep(CLIENT_SLEEP)
data["success"] = True
return flask.jsonify(data)
Flask使⽤Python装饰器在内部⾃动将请求的URL和⽬标函数关联了起来,这样⽅便我们快速搭建⼀个Web服务。
1.4 接⼝测试
服务搭建好了之后我们可以⽤⼀张图⽚来测试⼀下效果:
curl -X POST -F image=@test.jpg '127.0.0.1:5000/predict'
模型端的返回:
预测结果返回:
最后我们可以给搭建好的服务进⾏⼀个压⼒测试,看看服务的并发等性能如何,定义⼀个压测⽂件stress_test.py 如下:
from threading import Thread
import requests
import time
# 请求的URL
KERAS_REST_API_URL = "127.0.0.1:5000/predict"
# 测试图⽚
IMAGE_PATH = "test.jpg"
# 并发数
NUM_REQUESTS = 500
# 请求间隔
SLEEP_COUNT = 0.05
def call_predict_endpoint(n):
# 上传图像
image = open(IMAGE_PATH, "rb").read()
payload = {"image": image}
# 提交请求
r = requests.post(KERAS_REST_API_URL, files=payload).json()    # 确认请求是否成功
if r["success"]:
print("[INFO] thread {} OK".format(n))
else:
print("[INFO] thread {} FAILED".format(n))
# 多线程进⾏
for i in range(0, NUM_REQUESTS):
# 创建线程来调⽤api
t = Thread(target=call_predict_endpoint, args=(i,))
t.daemon = True
t.start()
time.sleep(SLEEP_COUNT)
time.sleep(300)
测试效果如下:

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。