⼀份C++线程池的代码,⾮常实⽤
1. #ifndef _ThreadPool_H_
2. #define _ThreadPool_H_
3. #pragma warning(disable: 4530)
4. #pragma warning(disable: 4786)
5. #include <cassert>
6. #include <vector>
7. #include <queue>
8. #include <windows.h>
9.
10. using namespace std;
11.
12. class ThreadJob //⼯作基类
13. {
14. public:
15. //供线程池调⽤的虚函数
16. virtual void DoJob(void *pPara) = 0;
17. };
18. class ThreadPool
19. {
20. public:
21. //dwNum 线程池规模
22. ThreadPool(DWORD dwNum = 4) : _lThreadNum(0), _lRunningNum(0)
23. {
24. InitializeCriticalSection(&_csThreadVector);
25. InitializeCriticalSection(&_csWorkQueue);
26. _EventComplete = CreateEvent(0, false, false, NULL);
27. _EventEnd = CreateEvent(0, true, false, NULL);
28. _SemaphoreCall = CreateSemaphore(0, 0, 0x7FFFFFFF, NULL);
29. _SemaphoreDel = CreateSemaphore(0, 0, 0x7FFFFFFF, NULL);
30. assert(_SemaphoreCall != INVALID_HANDLE_VALUE);
31. assert(_EventComplete != INVALID_HANDLE_VALUE);
32. assert(_EventEnd != INVALID_HANDLE_VALUE);
33. assert(_SemaphoreDel != INVALID_HANDLE_VALUE);
34. AdjustSize(dwNum <= 0 ? 4 : dwNum);
35. }
36. ~ThreadPool()
37. {
38. DeleteCriticalSection(&_csWorkQueue);
39. CloseHandle(_EventEnd);
40. CloseHandle(_EventComplete);
41. CloseHandle(_SemaphoreCall);
42. CloseHandle(_SemaphoreDel);
43.
44. vector<ThreadItem*>::iterator iter;
45. for(iter = _ThreadVector.begin(); iter != _d(); iter++)
46. {
47. if(*iter)
48. delete *iter;
49. }
50. DeleteCriticalSection(&_csThreadVector);
51. }
52. //调整线程池规模
53. int AdjustSize(int iNum)
54. {
55. if(iNum > 0)
56. {
57. ThreadItem *pNew;
58. EnterCriticalSection(&_csThreadVector);
59. for(int _i=0; _i<iNum; _i++)
60. {
61. _ThreadVector.push_back(pNew = new ThreadItem(this));
62. assert(pNew);
63. pNew->_Handle = CreateThread(NULL, 0, DefaultJobProc, pNew, 0, NULL);
64. // set priority
65. SetThreadPriority(pNew->_Handle, THREAD_PRIORITY_BELOW_NORMAL);
66. assert(pNew->_Handle);
67. }
68. LeaveCriticalSection(&_csThreadVector);
69. }
70. else
71. {
72. iNum *= -1;
73. ReleaseSemaphore(_SemaphoreDel, iNum > _lThreadNum ? _lThreadNum : iNum, NULL);
74. }
75. return (int)_lThreadNum;
76. }
77. //调⽤线程池
78. void Call(void (*pFunc)(void *), void *pPara = NULL)
79. {
80. assert(pFunc);
81. EnterCriticalSection(&_csWorkQueue);
82. _JobQueue.push(new JobItem(pFunc, pPara));
83. LeaveCriticalSection(&_csWorkQueue);
84. ReleaseSemaphore(_SemaphoreCall, 1, NULL);
85. }
86. //调⽤线程池
87. inline void Call(ThreadJob * p, void *pPara = NULL)
88. {
89. Call(CallProc, new CallProcPara(p, pPara));
90. }
91. //结束线程池, 并同步等待
92. bool EndAndWait(DWORD dwWaitTime = INFINITE)
93. {
94. SetEvent(_EventEnd);
95. return WaitForSingleObject(_EventComplete, dwWaitTime) == WAIT_OBJECT_0;
96. }
97. //结束线程池
98. inline void End()
99. {
100. SetEvent(_EventEnd);
101. }
102. inline DWORD Size()
103. {
104. return (DWORD)_lThreadNum;
105. }
106. inline DWORD GetRunningSize()
107. {
108. return (DWORD)_lRunningNum;
109. }
110. bool IsRunning()
111. {
112. return _lRunningNum > 0;
113. }
114. protected:
115. //⼯作线程
116. static DWORD WINAPI DefaultJobProc(LPVOID lpParameter = NULL)
117. {
118. ThreadItem *pThread = static_cast<ThreadItem*>(lpParameter);
119. assert(pThread);
120. ThreadPool *pThreadPoolObj = pThread->_pThis;
121. assert(pThreadPoolObj);
122. InterlockedIncrement(&pThreadPoolObj->_lThreadNum);
123. HANDLE hWaitHandle[3];
124. hWaitHandle[0] = pThreadPoolObj->_SemaphoreCall;
125. hWaitHandle[1] = pThreadPoolObj->_SemaphoreDel;
126. hWaitHandle[2] = pThreadPoolObj->_EventEnd;
127. JobItem *pJob;
128. bool fHasJob;
129.
130. for(;;)
131. {
132. DWORD wr = WaitForMultipleObjects(3, hWaitHandle, false, INFINITE);
133. //响应删除线程信号
134. if(wr == WAIT_OBJECT_0 + 1)
135. break;
136.
137. //从队列⾥取得⽤户作业
138. EnterCriticalSection(&pThreadPoolObj->_csWorkQueue);
139. if(fHasJob = !pThreadPoolObj->_pty())
140. {
141. pJob = pThreadPoolObj->_JobQueue.front();
142. pThreadPoolObj->_JobQueue.pop();
143. assert(pJob);
144. }
145. LeaveCriticalSection(&pThreadPoolObj->_csWorkQueue);
146. //受到结束线程信号确定是否结束线程(结束线程信号 && 是否还有⼯作)
147. if(wr == WAIT_OBJECT_0 + 2 && !fHasJob)
148. break;
149. if(fHasJob && pJob)
150. {
151. InterlockedIncrement(&pThreadPoolObj->_lRunningNum);
152. pThread->_dwLastBeginTime = GetTickCount();
153. pThread->_dwCount++;
154. pThread->_fIsRunning = true;
155. pJob->_pFunc(pJob->_pPara); //运⾏⽤户作业
156. delete pJob;
157. pThread->_fIsRunning = false;
158. InterlockedDecrement(&pThreadPoolObj->_lRunningNum);
159. }
160. }
161. //删除⾃⾝结构
162. EnterCriticalSection(&pThreadPoolObj->_csThreadVector);
163. pThreadPoolObj->_ase(find(pThreadPoolObj->_ThreadVector.begin(), pThreadPoolObj->_d(), pThread));
164. LeaveCriticalSection(&pThreadPoolObj->_csThreadVector);
165. delete pThread;
166. InterlockedDecrement(&pThreadPoolObj->_lThreadNum);
167. if(!pThreadPoolObj->_lThreadNum) //所有线程结束
168. SetEvent(pThreadPoolObj->_EventComplete);
169. return 0;
170. }
171. //调⽤⽤户对象虚函数
172. static void CallProc(void *pPara)
173. {
174. CallProcPara *cp = static_cast<CallProcPara *>(pPara);
175. assert(cp);
176. if(cp)
177. {
178. cp->_pObj->DoJob(cp->_pPara);
179. delete cp;
180. }
181. }
182. //⽤户对象结构
183. struct CallProcPara
184. {
185. ThreadJob* _pObj;//⽤户对象
186. void *_pPara;//⽤户参数
187. CallProcPara(ThreadJob* p, void *pPara) : _pObj(p), _pPara(pPara) { };
188. };
189. //⽤户函数结构
190. struct JobItem
191. {
192. void (*_pFunc)(void *);//函数
193. void *_pPara; //参数
194. JobItem(void (*pFunc)(void *) = NULL, void *pPara = NULL) : _pFunc(pFunc), _pPara(pPara) { };
195. };
196. //线程池中的线程结构
197. struct ThreadItem
waitforsingleobject函数
198. {
199. HANDLE _Handle; //线程句柄
200. ThreadPool *_pThis; //线程池的指针
201. DWORD _dwLastBeginTime; //最后⼀次运⾏开始时间
202. DWORD _dwCount; //运⾏次数
203. bool _fIsRunning;
204. ThreadItem(ThreadPool *pthis) : _pThis(pthis), _Handle(NULL), _dwLastBeginTime(0), _dwCount(0), _fIsRunning(false) { }; 205. ~ThreadItem()
206. {
207. if(_Handle)
208. {
209. CloseHandle(_Handle);
210. _Handle = NULL;
211. }
212. }
213. };
214.
215. std::queue<JobItem *> _JobQueue; //⼯作队列
216. std::vector<ThreadItem *> _ThreadVector; //线程数据
217. CRITICAL_SECTION _csThreadVector, _csWorkQueue; //⼯作队列临界, 线程数据临界
218. HANDLE _EventEnd, _EventComplete, _SemaphoreCall, _SemaphoreDel;//结束通知, 完成事件, ⼯作信号,删除线程信号
219. long _lThreadNum, _lRunningNum; //线程数, 运⾏的线程数
220. };
221. #endif //_ThreadPool_H_
转载⾃
基本上是拿来就⽤了,对WIN32 API不熟,但对线程池的逻辑还是⽐较熟的,认为这个线程池写得很清晰,我拿来⽤在⼀个多线程下载的模块中。很实⽤的东东。
调⽤⽅法
void threadfunc(void *p)
{
YourClass* yourObject = (YourClass*) p;
//... } ThreadPool tp; for(i=0; i<100; i++) tp.Call(threadfunc);
ThreadPool tp(20);//20为初始线程池规模
tp.Call(threadfunc, lpPara);
使⽤时注意⼏点:
1. ThreadJob 没什么⽤,直接写线程函数吧。
2. 线程函数(threadfunc)的⼊⼝参数void* 可以转成⾃定义的类型对象,这个对象可以记录下线程运⾏中的数据,并设置线程当前状态,以此与线程进⾏交互。
3. 线程池有⼀个EndAndWait函数,⽤于让线程池中所有计算正常结束。有时线程池中的⼀个线程可能要运⾏很长时间,怎么办?可以通过线程函数threadfunc的⼊⼝参数对象来处理,⽐如:
class YourClass {
int cmd; // cmd = 1是上线程停⽌计算,正常退出。
};
threadfunc(void* p) {
YourClass* yourObject = (YourClass*)p;
while (true) {
// do some calculation
if (yourClass->cmd == 1)
break;
}
}
在主线程中设置yourClass->cmd = 1,该线程就会⾃然结束。
很简洁通⽤的线程池实现。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论