⼀份C++线程池的代码,⾮常实⽤
1. #ifndef _ThreadPool_H_
2. #define _ThreadPool_H_
3. #pragma warning(disable: 4530)
4. #pragma warning(disable: 4786)
5. #include <cassert>
6. #include <vector>
7. #include <queue>
8. #include <windows.h>
9.
10. using namespace std;
11.
12. class ThreadJob  //⼯作基类
13. {
14. public:
15.  //供线程池调⽤的虚函数
16.  virtual void DoJob(void *pPara) = 0;
17. };
18. class ThreadPool
19. {
20. public:
21.  //dwNum 线程池规模
22.  ThreadPool(DWORD dwNum = 4) : _lThreadNum(0), _lRunningNum(0)
23.  {
24.    InitializeCriticalSection(&_csThreadVector);
25.    InitializeCriticalSection(&_csWorkQueue);
26.    _EventComplete = CreateEvent(0, false, false, NULL);
27.    _EventEnd = CreateEvent(0, true, false, NULL);
28.    _SemaphoreCall = CreateSemaphore(0, 0,  0x7FFFFFFF, NULL);
29.    _SemaphoreDel =  CreateSemaphore(0, 0,  0x7FFFFFFF, NULL);
30.    assert(_SemaphoreCall != INVALID_HANDLE_VALUE);
31.    assert(_EventComplete != INVALID_HANDLE_VALUE);
32.    assert(_EventEnd != INVALID_HANDLE_VALUE);
33.    assert(_SemaphoreDel != INVALID_HANDLE_VALUE);
34.    AdjustSize(dwNum <= 0 ? 4 : dwNum);
35.  }
36.  ~ThreadPool()
37.  {
38.    DeleteCriticalSection(&_csWorkQueue);
39.    CloseHandle(_EventEnd);
40.    CloseHandle(_EventComplete);
41.    CloseHandle(_SemaphoreCall);
42.    CloseHandle(_SemaphoreDel);
43.
44.    vector<ThreadItem*>::iterator iter;
45.    for(iter = _ThreadVector.begin(); iter != _d(); iter++)
46.    {
47.      if(*iter)
48.        delete *iter;
49.    }
50.    DeleteCriticalSection(&_csThreadVector);
51.  }
52.  //调整线程池规模
53.  int AdjustSize(int iNum)
54.  {
55.    if(iNum > 0)
56.    {
57.      ThreadItem *pNew;
58.      EnterCriticalSection(&_csThreadVector);
59.      for(int _i=0; _i<iNum; _i++)
60.      {
61.        _ThreadVector.push_back(pNew = new ThreadItem(this));
62.        assert(pNew);
63.        pNew->_Handle = CreateThread(NULL, 0, DefaultJobProc, pNew, 0, NULL);
64.        // set priority
65.        SetThreadPriority(pNew->_Handle, THREAD_PRIORITY_BELOW_NORMAL);
66.        assert(pNew->_Handle);
67.      }
68.      LeaveCriticalSection(&_csThreadVector);
69.    }
70.    else
71.    {
72.      iNum *= -1;
73.      ReleaseSemaphore(_SemaphoreDel,  iNum > _lThreadNum ? _lThreadNum : iNum, NULL);
74.    }
75.    return (int)_lThreadNum;
76.  }
77.  //调⽤线程池
78.  void Call(void (*pFunc)(void  *), void *pPara = NULL)
79.  {
80.    assert(pFunc);
81.    EnterCriticalSection(&_csWorkQueue);
82.    _JobQueue.push(new JobItem(pFunc, pPara));
83.    LeaveCriticalSection(&_csWorkQueue);
84.    ReleaseSemaphore(_SemaphoreCall, 1, NULL);
85.  }
86.  //调⽤线程池
87.  inline void Call(ThreadJob * p, void *pPara = NULL)
88.  {
89.    Call(CallProc, new CallProcPara(p, pPara));
90.  }
91.  //结束线程池, 并同步等待
92.  bool EndAndWait(DWORD dwWaitTime = INFINITE)
93.  {
94.    SetEvent(_EventEnd);
95.    return WaitForSingleObject(_EventComplete, dwWaitTime) == WAIT_OBJECT_0;
96.  }
97.  //结束线程池
98.  inline void End()
99.  {
100.    SetEvent(_EventEnd);
101.  }
102.  inline DWORD Size()
103.  {
104.    return (DWORD)_lThreadNum;
105.  }
106.  inline DWORD GetRunningSize()
107.  {
108.    return (DWORD)_lRunningNum;
109.  }
110.  bool IsRunning()
111.  {
112.    return _lRunningNum > 0;
113.  }
114. protected:
115.  //⼯作线程
116.  static DWORD WINAPI DefaultJobProc(LPVOID lpParameter = NULL)
117.  {
118.    ThreadItem *pThread = static_cast<ThreadItem*>(lpParameter);
119.    assert(pThread);
120.    ThreadPool *pThreadPoolObj = pThread->_pThis;
121.    assert(pThreadPoolObj);
122.    InterlockedIncrement(&pThreadPoolObj->_lThreadNum);
123.    HANDLE hWaitHandle[3];
124.    hWaitHandle[0] = pThreadPoolObj->_SemaphoreCall;
125.    hWaitHandle[1] = pThreadPoolObj->_SemaphoreDel;
126.    hWaitHandle[2] = pThreadPoolObj->_EventEnd;
127.    JobItem *pJob;
128.    bool fHasJob;
129.
130.    for(;;)
131.    {
132.      DWORD wr = WaitForMultipleObjects(3, hWaitHandle, false, INFINITE);
133.      //响应删除线程信号
134.      if(wr == WAIT_OBJECT_0 + 1)
135.        break;
136.
137.      //从队列⾥取得⽤户作业
138.      EnterCriticalSection(&pThreadPoolObj->_csWorkQueue);
139.      if(fHasJob = !pThreadPoolObj->_pty())
140.      {
141.        pJob = pThreadPoolObj->_JobQueue.front();
142.        pThreadPoolObj->_JobQueue.pop();
143.        assert(pJob);
144.      }
145.      LeaveCriticalSection(&pThreadPoolObj->_csWorkQueue);
146.      //受到结束线程信号确定是否结束线程(结束线程信号 && 是否还有⼯作)
147.      if(wr == WAIT_OBJECT_0 + 2 && !fHasJob)
148.        break;
149.      if(fHasJob && pJob)
150.      {
151.        InterlockedIncrement(&pThreadPoolObj->_lRunningNum);
152.        pThread->_dwLastBeginTime = GetTickCount();
153.        pThread->_dwCount++;
154.        pThread->_fIsRunning = true;
155.        pJob->_pFunc(pJob->_pPara); //运⾏⽤户作业
156.        delete pJob;
157.        pThread->_fIsRunning = false;
158.        InterlockedDecrement(&pThreadPoolObj->_lRunningNum);
159.      }
160.    }
161.    //删除⾃⾝结构
162.    EnterCriticalSection(&pThreadPoolObj->_csThreadVector);
163.    pThreadPoolObj->_ase(find(pThreadPoolObj->_ThreadVector.begin(), pThreadPoolObj->_d(), pThread));
164.    LeaveCriticalSection(&pThreadPoolObj->_csThreadVector);
165.    delete pThread;
166.    InterlockedDecrement(&pThreadPoolObj->_lThreadNum);
167.    if(!pThreadPoolObj->_lThreadNum)  //所有线程结束
168.      SetEvent(pThreadPoolObj->_EventComplete);
169.    return 0;
170.  }
171.  //调⽤⽤户对象虚函数
172.  static void CallProc(void *pPara)
173.  {
174.    CallProcPara *cp = static_cast<CallProcPara *>(pPara);
175.    assert(cp);
176.    if(cp)
177.    {
178.      cp->_pObj->DoJob(cp->_pPara);
179.      delete cp;
180.    }
181.  }
182.  //⽤户对象结构
183.  struct CallProcPara
184.  {
185.    ThreadJob* _pObj;//⽤户对象
186.    void *_pPara;//⽤户参数
187.    CallProcPara(ThreadJob* p, void *pPara) : _pObj(p), _pPara(pPara) { };
188.  };
189.  //⽤户函数结构
190.  struct JobItem
191.  {
192.    void (*_pFunc)(void  *);//函数
193.    void *_pPara; //参数
194.    JobItem(void (*pFunc)(void  *) = NULL, void *pPara = NULL) : _pFunc(pFunc), _pPara(pPara) { };
195.  };
196.  //线程池中的线程结构
197.  struct ThreadItem
waitforsingleobject函数
198.  {
199.    HANDLE _Handle; //线程句柄
200.    ThreadPool *_pThis;  //线程池的指针
201.    DWORD _dwLastBeginTime; //最后⼀次运⾏开始时间
202.    DWORD _dwCount; //运⾏次数
203.    bool _fIsRunning;
204.    ThreadItem(ThreadPool *pthis) : _pThis(pthis), _Handle(NULL), _dwLastBeginTime(0), _dwCount(0), _fIsRunning(false) { }; 205.    ~ThreadItem()
206.    {
207.      if(_Handle)
208.      {
209.        CloseHandle(_Handle);
210.        _Handle = NULL;
211.      }
212.    }
213.  };
214.
215.  std::queue<JobItem *> _JobQueue;  //⼯作队列
216.  std::vector<ThreadItem *>  _ThreadVector; //线程数据
217.  CRITICAL_SECTION _csThreadVector, _csWorkQueue; //⼯作队列临界, 线程数据临界
218.  HANDLE _EventEnd, _EventComplete, _SemaphoreCall, _SemaphoreDel;//结束通知, 完成事件, ⼯作信号,删除线程信号
219.  long _lThreadNum, _lRunningNum; //线程数, 运⾏的线程数
220. };
221. #endif //_ThreadPool_H_
转载⾃
基本上是拿来就⽤了,对WIN32 API不熟,但对线程池的逻辑还是⽐较熟的,认为这个线程池写得很清晰,我拿来⽤在⼀个多线程下载的模块中。很实⽤的东东。
调⽤⽅法
void threadfunc(void *p)
{
YourClass* yourObject = (YourClass*)    p;
//... } ThreadPool tp; for(i=0; i<100; i++)  tp.Call(threadfunc);
ThreadPool tp(20);//20为初始线程池规模
tp.Call(threadfunc, lpPara);
使⽤时注意⼏点:
1. ThreadJob  没什么⽤,直接写线程函数吧。
2. 线程函数(threadfunc)的⼊⼝参数void* 可以转成⾃定义的类型对象,这个对象可以记录下线程运⾏中的数据,并设置线程当前状态,以此与线程进⾏交互。
3. 线程池有⼀个EndAndWait函数,⽤于让线程池中所有计算正常结束。有时线程池中的⼀个线程可能要运⾏很长时间,怎么办?可以通过线程函数threadfunc的⼊⼝参数对象来处理,⽐如:
class YourClass {
int cmd; // cmd = 1是上线程停⽌计算,正常退出。
};
threadfunc(void* p) {
YourClass* yourObject = (YourClass*)p;
while (true) {
// do some calculation
if (yourClass->cmd == 1)
break;
}
}
在主线程中设置yourClass->cmd = 1,该线程就会⾃然结束。
很简洁通⽤的线程池实现。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。