对thrift中的TThreadPoolServer进⾏流量限制
thrift 对应 C++ 服务端实现中,基于线程池实现的TThreadPoolServer是最常⽤的实现之⼀。在使⽤过程中,有些场景需要对流量进⾏限制。在分析具体实现后,发现原有实现没法很好的完成这个需求,因此通过包装ThreadManager实现了这个功能。
逻辑分析
TThreadPoolServer使⽤ThreadManager作为线程池。⾃带的ThreadManager⽀持传⼊线程池线程数和最⼤任务数作为参数。
/**
* Creates a simple thread manager the uses count number of worker threads and has
* a pendingTaskCountMax maximum pending tasks. The default, 0, specified no limit
* on pending tasks
*/
static stdcxx::shared_ptr<ThreadManager> newSimpleThreadManager(size_t count = 4,
size_t pendingTaskCountMax = 0);
在TThreadPoolServer接受⼀个新连接后,通过ThreadManager::add接⼝,把新建连接任务传⼊线程池处理,具体处理逻辑如下:
void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration) {
Guard g(mutex_, timeout);
if (!g) {
throw TimedOutException();
}
if (state_ != ThreadManager::STARTED) {
throw IllegalStateException(
"ThreadManager::Impl::add ThreadManager "
"not started");
}
// 设置了任务队列长度限制,并且任务队列长度超过限制,尝试删除⼀个过期任务
if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
removeExpired(true);
}
// 仍然超过限制,如果设置超时则等待,否则直接抛出TooManyPendingTasksException异常
if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
if (canSleep() && timeout >= 0) {
while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
// This is thread safe because the mutex is shared between monitors.
maxMonitor_.wait(timeout);
}
} else {
throw TooManyPendingTasksException();
}
}
tasks_.push_back(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value, expiration)));
// If idle thread is available notify it, otherwise all worker threads are
// running and will get around to this task in time.
if (idleCount_ > 0) {
monitor_.notify();
}
}
通过上述代码可以发现,默认实现只有在设置了任务超时并且队列超过长度时,才会尝试删除任务,否则就会等待或者抛出异常,⽽外层实现是没有处理TooManyPendingTasksException这个异常的。在设计上,是为了确保收到的请求都进⾏处理,⽽选择等待。
具体实现
为了限制队列长度,在请求过多时主动丢弃请求。我通过封装ThreadManager的标准实现,修改了队列的处理逻辑。
#pragma once
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/concurrency/Mutex.h>
namespace apache {
namespace thrift {
namespace concurrency {
class LimitThreadManager : public apache::thrift::concurrency::ThreadManager
{
public:
LimitThreadManager(size_t workerCount = 4, size_t pendingTaskCountMax = 0) {
_threadManager = newSimpleThreadManager(workerCount, 0);thrift
_pendingTaskCountMax = pendingTaskCountMax;
}
virtual ~LimitThreadManager() {}
virtual void start() {
_threadManager->start();
}
virtual void stop() {
_threadManager->stop();
}
virtual STATE state() const {
return _threadManager->state();
}
virtual stdcxx::shared_ptr<ThreadFactory> threadFactory() const {
return _threadManager->threadFactory();
}
virtual void threadFactory(stdcxx::shared_ptr<ThreadFactory> value) {
_threadManager->threadFactory(value);
}
virtual void addWorker(size_t value = 1) {
_threadManager->addWorker(value);
}
virtual void removeWorker(size_t value = 1) {
_threadManager->removeWorker(value);
}
virtual size_t idleWorkerCount() const {
return _threadManager->idleWorkerCount();
}
virtual size_t workerCount() const {
return _threadManager->workerCount();
}
virtual size_t pendingTaskCount() const {
return _threadManager->pendingTaskCount();
}
virtual size_t totalTaskCount() const {
return _threadManager->totalTaskCount();
}
virtual size_t pendingTaskCountMax() const {
return _pendingTaskCountMax;
}
}
virtual size_t expiredTaskCount() {
return _threadManager->expiredTaskCount();
}
/* 添加⼀个任务到⼯作线程
* 和默认实现不同,这个任务在设置 pendingTaskCountMax() 并且 pendingTaskCount()
* ⼤于等于 pendingTaskCountMax() 时不会堵塞,⽽是直接丢掉请求。
* 因此,也不会抛出 TooManyPendingTasksException 异常
* 参数 timeout 和 expiration ⽆效
*/
virtual void add(stdcxx::shared_ptr<Runnable> task, int64_t timeout = 0LL, int64_t expiration = 0LL) {
Guard g(_mutex);
if(_pendingTaskCountMax > 0 && _threadManager->pendingTaskCount() >= _pendingTaskCountMax) {
if(_expireCallback)
_expireCallback(task);
return;
}
_threadManager->add(task, 0, 0);
}
virtual void remove(stdcxx::shared_ptr<Runnable> task) {
_threadManager->remove(task);
}
virtual stdcxx::shared_ptr<Runnable> removeNextPending() {
return _threadManager->removeNextPending();
}
// 去掉超时删除任务功能
virtual void removeExpiredTasks() {
}
// 修改失败回调设置
virtual void setExpireCallback(ExpireCallback expireCallback) {
Guard g(_mutex);
_expireCallback = expireCallback;
}
private:
stdcxx::shared_ptr<ThreadManager> _threadManager;
size_t _pendingTaskCountMax;
Mutex _mutex;
ExpireCallback _expireCallback;
};
}
}
}
在使⽤时,直接⽤LimitThreadManager代替默认的ThreadManager,则可以设置缓存队列最⼤长度。当缓存队列已满时丢弃新来的请求。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论