12 #ifndef ROOT_TThreadPool
13 #define ROOT_TThreadPool
33 #define sleep(s) _sleep(s)
48 TNonCopyable(
const TNonCopyable&);
49 const TNonCopyable& operator=(
const TNonCopyable&);
69 template <
class aTask,
class aParam>
70 class TThreadPoolTaskImp {
72 bool run(aParam ¶m) {
73 aTask *pThis =
reinterpret_cast<aTask *
>(
this);
74 return pThis->runTask(param);
86 template <
class aTask,
class aParam>
87 class TThreadPoolTask {
89 typedef TThreadPoolTaskImp<aTask, aParam> task_t;
92 TThreadPoolTask(task_t &task, aParam ¶m):
97 return fTask.run(fTaskParam);
115 template <
class aTask,
class aParam>
116 class TThreadPool :
public TNonCopyable {
118 typedef TThreadPoolTask<aTask, aParam> task_t;
119 typedef std::queue<task_t*> taskqueue_t;
120 typedef std::vector<TThread*> threads_array_t;
123 TThreadPool(
size_t threadsCount,
bool needDbg =
false):
127 fIdleThreads(threadsCount),
129 fThreadNeeded =
new TCondition(&fMutex);
130 fThreadAvailable =
new TCondition(&fMutex);
131 fAllTasksDone =
new TCondition(&fMutexAllTasksDone);
133 for (
size_t i = 0; i < threadsCount; ++i) {
134 TThread *pThread =
new TThread(&TThreadPool::Executor,
this);
135 fThreads.push_back(pThread);
139 fThreadJoinHelper =
new TThread(&TThreadPool::JoinHelper,
this);
142 fThreadMonitor =
new TThread(&TThreadPool::Monitor,
this);
143 fThreadMonitor->Run();
150 threads_array_t::const_iterator iter = fThreads.begin();
151 threads_array_t::const_iterator iter_end = fThreads.end();
152 for (; iter != iter_end; ++iter)
155 delete fThreadJoinHelper;
157 delete fThreadNeeded;
158 delete fThreadAvailable;
159 delete fAllTasksDone;
163 TLockGuard lock(&fMutex);
164 TThread *pThread =
new TThread(&TThreadPool::Executor,
this);
165 fThreads.push_back(pThread);
170 void PushTask(
typename TThreadPoolTask<aTask, aParam>::task_t &task, aParam param) {
172 DbgLog(
"Main thread. Try to push a task");
174 TLockGuard lock(&fMutex);
175 task_t *t =
new task_t(task, param);
179 DbgLog(
"Main thread. the task is pushed");
181 TLockGuard lock(&fMutex);
182 fThreadNeeded->Broadcast();
185 void Stop(
bool processRemainingJobs =
false) {
190 if (processRemainingJobs) {
191 TLockGuard lock(&fMutex);
193 while (!fTasks.empty() && !fStopped) {
194 DbgLog(
"Main thread is waiting");
195 fThreadAvailable->Wait();
196 DbgLog(
"Main thread is DONE waiting");
201 TLockGuard lock(&fMutex);
203 fThreadNeeded->Broadcast();
204 DbgLog(
"Main threads requests to STOP");
208 fThreadJoinHelper->Run();
209 fThreadJoinHelper->Join();
215 TLockGuard lock(&fMutexAllTasksDone);
216 fAllTasksDone->Wait();
219 size_t TasksCount()
const {
223 size_t SuccessfulTasks()
const {
224 return fSuccessfulTasks;
227 size_t IdleThreads()
const {
232 static void* Monitor(
void *arg) {
236 TThreadPool *pThis =
reinterpret_cast<TThreadPool*
>(arg);
237 while (
true && !pThis->fStopped) {
238 std::stringstream ss;
240 <<
">>>> Check for tasks."
241 <<
" Number of Tasks: " << pThis->fTasks.size()
242 <<
"; Idle threads: " << pThis->IdleThreads();
243 pThis->DbgLog(ss.str());
249 static void* Executor(
void *arg) {
250 TThreadPool *pThis =
reinterpret_cast<TThreadPool*
>(arg);
252 while (!pThis->fStopped) {
258 TLockGuard lock(&pThis->fMutex);
259 if (pThis->fTasks.empty() && !pThis->fStopped) {
260 pThis->DbgLog(
"waiting for a task");
262 if (pThis->fThreads.size() == pThis->fIdleThreads) {
263 TLockGuard l(&pThis->fMutexAllTasksDone);
264 pThis->fAllTasksDone->Broadcast();
268 pThis->fThreadNeeded->Wait();
270 pThis->DbgLog(
"done waiting for tasks");
275 TLockGuard lock(&pThis->fMutex);
276 if (!pThis->fTasks.empty()) {
277 --pThis->fIdleThreads;
278 task = pThis->fTasks.front();
281 pThis->DbgLog(
"get the task");
282 }
else if (pThis->fThreads.size() == pThis->fIdleThreads) {
283 TLockGuard l(&pThis->fMutexAllTasksDone);
284 pThis->fAllTasksDone->Broadcast();
286 pThis->DbgLog(
"done Check <<<<");
291 pThis->DbgLog(
"Run the task");
294 TLockGuard lock(&pThis->fMutex);
295 ++pThis->fSuccessfulTasks;
300 TLockGuard lock(&pThis->fMutex);
301 ++pThis->fIdleThreads;
303 pThis->DbgLog(
"Done Running the task");
306 TLockGuard lock(&pThis->fMutex);
307 pThis->fThreadAvailable->Broadcast();
310 pThis->DbgLog(
"**** DONE ***");
314 static void *JoinHelper(
void *arg) {
315 TThreadPool *pThis =
reinterpret_cast<TThreadPool*
>(arg);
316 threads_array_t::const_iterator iter = pThis->fThreads.begin();
317 threads_array_t::const_iterator iter_end = pThis->fThreads.end();
318 for (; iter != iter_end; ++iter)
324 static bool IsThreadActive(TThread *pThread) {
326 return (pThread->GetState() == TThread::kRunningState);
329 void DbgLog(
const std::string &msg) {
332 TLockGuard lock(&fDbgOutputMutex);
333 std::cout <<
"[" << TThread::SelfId() <<
"] " << msg << std::endl;
339 TCondition *fThreadNeeded;
340 TCondition *fThreadAvailable;
341 TMutex fMutexAllTasksDone;
342 TCondition *fAllTasksDone;
343 threads_array_t fThreads;
344 TThread *fThreadJoinHelper;
345 TThread *fThreadMonitor;
346 volatile bool fStopped;
347 size_t fSuccessfulTasks;
350 TMutex fDbgOutputMutex;