linux下的c++线程池实现
我设计这个线程池的初衷是为了与socket对接的。线程池的实现千变万化,我得这个并不⼀定是最好的,但却是否和我⼼⽬中需求模型的。现把部分设计思路和代码贴出,以期抛砖引⽟。个⼈⽐较喜欢搞开源,所以⼤家如果觉得有什么需要改善的地⽅,欢迎给予评论。思前想后,也没啥设计图能表达出设计思想,就把类图贴出来吧。
类图设计如下:
Command类是我们的业务类。这个类⾥只能存放简单的内置类型,这样⽅便与socket的直接传输。我定义了⼀个cmd_成员⽤于存放命令字,arg_⽤于存放业务的参数。这个参数可以使⽤分隔符来分隔各个参数。我设计的只是简单实现,如果有序列化操作了,完全不需要使⽤我这种⽅法啦。
ThreadProcess就是业务处理类,这⾥边定义了各个⽅法⽤于进⾏业务处理,它将在ThreadPool中的Process函数中调⽤。
ThreadPool就是我们的线程池类。其中的成员变量都是静态变量,Process就是线程处理函数。
#define MAX_THREAD_NUM 50 // 该值⽬前需要设定为初始线程数的整数倍
#define ADD_FACTOR 40 // 该值表⽰⼀个线程可以处理的最⼤任务数
#define THREAD_NUM 10 // 初始线程数
bshutdown_:⽤于线程退出。
command_:⽤于存放任务队列
command_cond_:条件变量
command_mutex_:互斥锁
icurr_thread_num_:当前线程池中的线程数
thread_id_map_:这个map⽤于存放线程对应的其它信息,我只存放了线程的状态,0为正常,1为退出。还可以定义其它的结构来存放更多的信息,例如存放套接字。
InitializeThreads:⽤于初始化线程池,先创建THREAD_NUM个线程。后期扩容也需要这个函数。
Process:线程处理函数,这⾥边会调⽤AddThread和DeleteThread在进⾏线程池的伸缩。
AddWork:往队列中添加⼀个任务。
ThreadDestroy:线程销毁函数。
AddThread:扩容THREAD_NUM个线程
DeleteThread:如果任务队列为空,则将原来的线程池恢复到THREAD_NUM个。这⾥可以根据需要进⾏修改。
以下贴出代码以供⼤家参考。
command.h
#ifndef COMMAND_H_
#define COMMAND_H_
class Command
{
public:
int get_cmd();
char* get_arg();
void set_cmd(int cmd);
void set_arg(char* arg);
private:
int cmd_;
char arg_[65];
};
#endif /* COMMAND_H_ */
command.cpp
#include <string.h>
#include "command.h"
int Command::get_cmd()
{
return cmd_;
}
char* Command::get_arg()
{
return arg_;
}
void Command::set_cmd(int cmd)
{
cmd_ = cmd;
}
void Command::set_arg(char* arg)
{
if(NULL == arg)
{
return;
}
strncpy(arg_,arg,64);
arg_[64] = '\0';
}
thread_process.h
#ifndef THREAD_PROCESS_H_
#define THREAD_PROCESS_H_
class ThreadProcess
{
public:
void Process0(void* arg);
void Process1(void* arg);
void Process2(void* arg);
};
#endif /* THREAD_PROCESS_H_ */
thread_process.cpp
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include "thread_process.h"
linux下的sleep函数void ThreadProcess::Process0(void* arg)
{
printf("thread %u is starting process %s\n",pthread_self(),arg);
usleep(100*1000);
}
void ThreadProcess::Process1(void* arg)
{
printf("thread %u is starting process %s\n",pthread_self(),arg);
usleep(100*1000);
}
void ThreadProcess::Process2(void* arg)
{
printf("thread %u is starting process %s\n",pthread_self(),arg);
usleep(100*1000);
}
thread_pool.h
#ifndef THREAD_POOL_H_
#define THREAD_POOL_H_
#include <map>
#include <vector>
#include "command.h"
#define MAX_THREAD_NUM 50 // 该值⽬前需要设定为初始线程数的整数倍
#define ADD_FACTOR 40 // 该值表⽰⼀个线程可以处理的最⼤任务数
#define THREAD_NUM 10 // 初始线程数
class ThreadPool
{
public:
ThreadPool() {};
static void InitializeThreads();
void AddWork(Command command);
void ThreadDestroy(int iwait = 2);
private:
static void* Process(void* arg);
static void AddThread();
static void DeleteThread();
static bool bshutdown_;
static int icurr_thread_num_;
static std::map<pthread_t,int> thread_id_map_;
static std::vector<Command> command_;
static pthread_mutex_t command_mutex_;
static pthread_cond_t command_cond_;
};
#endif /* THREAD_POOL_H_ */
thread_pool.cpp
#include <pthread.h>
#include <stdlib.h>
#include "thread_pool.h"
#include "thread_process.h"
#include "command.h"
bool ThreadPool::bshutdown_ = false;
int ThreadPool::icurr_thread_num_ = THREAD_NUM;
std::vector<Command> ThreadPool::command_;
std::map<pthread_t,int> ThreadPool::thread_id_map_;
pthread_mutex_t ThreadPool::command_mutex_ = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t ThreadPool::command_cond_ = PTHREAD_COND_INITIALIZER;
void ThreadPool::InitializeThreads()
{
for (int i = 0; i < THREAD_NUM ; ++i)
{
pthread_t tempThread;
pthread_create(&tempThread, NULL, ThreadPool::Process, NULL);
thread_id_map_[tempThread] = 0;
}
}
void* ThreadPool::Process(void* arg)
{
ThreadProcess threadprocess;
Command command;
while (true)
{
pthread_mutex_lock(&command_mutex_);
// 如果线程需要退出,则此时退出
if (1 == thread_id_map_[pthread_self()])
{
pthread_mutex_unlock(&command_mutex_);
printf("thread %u will exit\n", pthread_self());
pthread_exit(NULL);
}
/
/ 当线程不需要退出且没有需要处理的任务时,需要缩容的则缩容,不需要的则等待信号if (0 == command_.size() && !bshutdown_)
{
if(icurr_thread_num_ > THREAD_NUM)
{
DeleteThread();
if (1 == thread_id_map_[pthread_self()])
{
pthread_mutex_unlock(&command_mutex_);
printf("thread %u will exit\n", pthread_self());
pthread_exit(NULL);
}
}
pthread_cond_wait(&command_cond_,&command_mutex_);
}
// 线程池需要关闭,关闭已有的锁,线程退出
if(bshutdown_)
{
pthread_mutex_unlock (&command_mutex_);
printf ("thread %u will exit\n", pthread_self ());
pthread_exit (NULL);
}
// 如果线程池的最⼤线程数不等于初始线程数,则表明需要扩容if(icurr_thread_num_ < command_.size()))
{
AddThread();
}
// 从容器中取出待办任务
std::vector<Command>::iterator iter = command_.begin();
command.set_arg(iter->get_arg());
command.set_cmd(iter->get_cmd());
command_.erase(iter);
pthread_mutex_unlock(&command_mutex_);
// 开始业务处理
_cmd())
{
case0:
threadprocess._arg());
break;
case1:
threadprocess._arg());
break;
case2:
threadprocess._arg());
break;
default:
break;
}
}
return NULL; // 完全为了消除警告(eclipse编写的代码,警告很烦⼈) }
void ThreadPool::AddWork(Command command)
{
bool bsignal = false;
pthread_mutex_lock(&command_mutex_);
if (0 == command_.size())
{
bsignal = true;
}
command_.push_back(command);
pthread_mutex_unlock(&command_mutex_);
if (bsignal)
{
pthread_cond_signal(&command_cond_);
}
}
void ThreadPool::ThreadDestroy(int iwait)
{
while(0 != command_.size())
{
sleep(abs(iwait));
}
bshutdown_ = true;
pthread_cond_broadcast(&command_cond_);
std::map<pthread_t,int>::iterator iter = thread_id_map_.begin(); for (; iter!=thread_id_map_.end(); ++iter)
{
pthread_join(iter->first,NULL);
}
pthread_mutex_destroy(&command_mutex_);
pthread_cond_destroy(&command_cond_);
}
void ThreadPool::AddThread()
{
if(((icurr_thread_num_*ADD_FACTOR) < command_.size())
&& (MAX_THREAD_NUM != icurr_thread_num_))
{
InitializeThreads();
icurr_thread_num_ += THREAD_NUM;
}
}
void ThreadPool::DeleteThread()
{
int size = icurr_thread_num_ - THREAD_NUM;
std::map<pthread_t,int>::iterator iter = thread_id_map_.begin(); for(int i=0; i<size; ++i,++iter)
{
iter->second = 1;
}
}
main.cpp
#include "thread_pool.h"
#include "command.h"
int main()
{
ThreadPool thread_pool;
thread_pool.InitializeThreads();
Command command;
char arg[8] = {0};
for(int i=1; i<=1000; ++i)
{
command.set_cmd(i%3);
sprintf(arg,"%d",i);
command.set_arg(arg);
thread_pool.AddWork(command);
}
sleep(10); // ⽤于测试线程池缩容
thread_pool.ThreadDestroy();
return0;
}
代码是按照google的开源c++编码规范编写。⼤家可以通过改变那⼏个宏的值来调整线程池。有问题⼤家⼀起讨论。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论