]> sjero.net Git - qpsnr/blob - src/mt.h
Initial Commit of QPSNR (version 0.2.1)
[qpsnr] / src / mt.h
1 /*
2 *       qpsnr (C) 2010 E. Oriani, ema <AT> fastwebnet <DOT> it
3 *
4 *       This file is part of qpsnr.
5 *
6 *       qpsnr is free software: you can redistribute it and/or modify
7 *       it under the terms of the GNU General Public License as published by
8 *       the Free Software Foundation, either version 3 of the License, or
9 *       (at your option) any later version.
10 *
11 *       qpsnr is distributed in the hope that it will be useful,
12 *       but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *       GNU General Public License for more details.
15 *
16 *       You should have received a copy of the GNU General Public License
17 *       along with qpsnr.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 /*
21 *       myNZB (C) 2009 E. Oriani, ema <AT> fastwebnet <DOT> it
22 *
23 *       This file is part of myNZB.
24 *
25 *       myNZB is free software: you can redistribute it and/or modify
26 *       it under the terms of the GNU General Public License as published by
27 *       the Free Software Foundation, either version 3 of the License, or
28 *       (at your option) any later version.
29 *
30 *       myNZB is distributed in the hope that it will be useful,
31 *       but WITHOUT ANY WARRANTY; without even the implied warranty of
32 *       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
33 *       GNU General Public License for more details.
34 *
35 *       You should have received a copy of the GNU General Public License
36 *       along with myNZB.  If not, see <http://www.gnu.org/licenses/>.
37 */
38
39 #ifndef _MT_H_
40 #define _MT_H_
41
42 #include <pthread.h>
43 #include <semaphore.h>
44 #include <list>
45 #include <vector>
46 #include <errno.h>
47 #include <exception>
48 #include <string>
49
50 namespace mt {
51
52         class mt_exception : public std::exception {
53                 const std::string       _what;
54         public:
55                 mt_exception(const std::string& what) : _what(what) {
56                 }
57
58                 virtual const char* what() const throw() {
59                         return _what.c_str();
60                 }
61
62                 ~mt_exception() throw () {
63                 }
64         };
65
66         class Semaphore {
67                 sem_t   _sem;
68
69                 Semaphore(const Semaphore&);
70                 Semaphore& operator=(const Semaphore&);
71         public:
72                 Semaphore() {
73                         if (0 != sem_init(&_sem, 0, 1))
74                                 throw mt_exception("Semaphore: Semaphore()");
75                 }
76
77                 void push(void) {
78                         if (0 != sem_wait(&_sem))
79                                 throw mt_exception("Sempahore: push");
80                 }
81
82                 bool trypush(void) {
83                         if (0 == sem_trywait(&_sem)) {
84                                 return true;
85                         } else if (errno == EAGAIN) {
86                                 return false;
87                         }
88                         throw mt_exception("Semaphore: trywait");
89                 }
90
91                 void pop(void) {
92                         if (0 != sem_post(&_sem))
93                                 throw mt_exception("Sempahore: pop");
94                 }
95                 
96                 ~Semaphore() {
97                         sem_destroy(&_sem);
98                 }
99         };
100
101         class Mutex {
102                 pthread_mutex_t _mtx;
103
104                 Mutex(const Mutex&);
105                 Mutex& operator=(const Mutex&);
106         public:
107                 Mutex() {
108                         if (0 != pthread_mutex_init(&_mtx, NULL))
109                                 throw mt_exception("Mutex: Mutex()");
110                 }
111
112                 void lock(void) {
113                         if (0 != pthread_mutex_lock(&_mtx))
114                                 throw mt_exception("Mutex: lock");
115                 }
116
117                 void unlock(void) {
118                         if (0 != pthread_mutex_unlock(&_mtx))
119                                 throw mt_exception("Mutex: unlock");
120                 }
121
122                 ~Mutex() {
123                         pthread_mutex_destroy(&_mtx);
124                 }
125         };
126
127         class ScopedLock {
128                 Mutex& _mtx;
129                 
130                 ScopedLock(const ScopedLock&);
131                 ScopedLock& operator=(const ScopedLock&);
132         public:
133                 ScopedLock(Mutex& mtx) : _mtx(mtx) {
134                         _mtx.lock();
135                 }
136
137                 ~ScopedLock() {
138                         _mtx.unlock();
139                 }
140         };
141
142         class Thread {
143                 pthread_t       _th;
144
145                 static void* exec(void* par) throw() {
146                         try {
147                                 Thread *p = (Thread*)par;
148                                 p->run();
149                         } catch (...) {
150                         }
151                         return 0;
152                 }
153
154                 Thread(const Thread&);
155                 Thread& operator=(const Thread&);
156         public:
157                 Thread() : _th(0) {
158                 }
159
160                 virtual void run(void) = 0;
161
162                 void start(void) {
163                         if (0 != pthread_create(&_th, NULL, &exec, this))
164                                         throw mt_exception("Thread: start");
165                 }
166
167                 void join(void) {
168                         if (0!=_th) pthread_join(_th, NULL);
169                 }
170                 
171                 virtual ~Thread() {
172                 }
173         };
174
175         class ThreadPool {
176         public:
177                 class Job {
178                         friend  class   ThreadPool;
179                         Semaphore       _sem;
180                         volatile bool   _on_hold;
181                         const bool      _to_be_deleted;
182                 public:
183                         Job(const bool& to_be_deleted = false) : _on_hold(true), _to_be_deleted(to_be_deleted) {
184                                 _sem.push();
185                         }
186
187                         virtual void run(void) = 0;
188
189                         /* Just rememeber that
190                         - void wait(void)
191                         - bool is_running(void)
192                            can only be used when the ownership of the
193                            ThreadPool::Job instance is external (eg.
194                            to_be_deleted = false).
195                            Otherwise this will lead to a crash!
196                         */
197
198                         // We pop the semaphore again in case someone else
199                         // will ask to wait again...
200                         void wait(void) {
201                                 _sem.push();
202                                 _sem.pop();
203                         };
204
205                         bool is_running(void) {
206                                 if (_on_hold) return false;
207                                 if (false == _sem.trypush())
208                                         return true;
209                                 _sem.pop();
210                                 return false;
211                         }
212
213                         virtual ~Job() {
214                         }
215                 };
216         private:
217                 Mutex           _list_mtx;
218                 Semaphore       _list_sem;
219                 std::list<Job*> _list_jobs;
220
221                 // the following variable is not mutex
222                 // protected because is sort of write-only
223                 // by main ThreadPool thread, see destructor
224                 volatile bool           _tp_quit;
225                 const unsigned int      _n_execs;
226                 std::vector<pthread_t>  _th_ids;
227
228                 bool get_job(Job** _job) {
229                         ScopedLock _sl(_list_mtx);
230                         if (!_list_jobs.empty()) {
231                                 *_job = _list_jobs.front();
232                                 _list_jobs.pop_front();
233                                 return true;
234                         }
235                         return false;
236                 }
237
238                 // Technically we should declare it as extern "C" but we
239                 // don't care as seen as the function pointer will correspond
240                 // to a proper C-like function even if the name will be C++ like
241                 static void* job_exec(void* par) throw() {
242                         try {
243                                 ThreadPool *p = (ThreadPool*)par;
244                                 while(true) {
245                                         // first push on the semaphore
246                                         p->_list_sem.push();
247                                         // check if we have to quit
248                                         if (p->_tp_quit) return 0;
249                                         // get an element to process
250                                         Job     *curJob = 0;
251                                         if (p->get_job(&curJob)) {
252                                                 // we need to save the delete_job varibale because
253                                                 // after the semaphore has been popped the object 
254                                                 // could not exist anymore
255                                                 const bool delete_job = curJob->_to_be_deleted;
256                                                 try {
257                                                         curJob->_on_hold = false;
258                                                         curJob->run();
259                                                 } catch(...) {
260                                                 }
261                                                 // if the following instruction throws is better 
262                                                 // to let the user know because this means that
263                                                 // the semaphore is not valid anymore...this should
264                                                 // never happen...
265                                                 curJob->_sem.pop();
266                                                 // if it has to be deleted do it!
267                                                 if (delete_job) delete curJob;
268                                         }
269                                         // check if we have to quit
270                                         if (p->_tp_quit) return 0;
271                                 }
272                         } catch(...) {
273                         }
274                         return 0;
275                 }
276
277                 ThreadPool(const ThreadPool&);
278                 ThreadPool& operator=(const ThreadPool&);
279         public:
280                 // Just take into account that semaphores are not syscall immune
281                 // so when you run in debug mode you can have exceptions thrown on
282                 // push because of system interrruption! Don't get scared!
283                 ThreadPool(const unsigned int& n_execs) : _n_execs(n_execs), _th_ids(n_execs), _tp_quit(false) {
284                         if (_n_execs == 0 || _n_execs > 256)
285                                 throw mt_exception("ThreadPool: invalid number of n_execs");
286                         // create the job_exec threads
287                         for (unsigned int i = 0; i < _n_execs; ++i)
288                                 if (0 != pthread_create(&_th_ids[i], NULL, &job_exec, this)) {
289                                         for (unsigned int j=0; j < i; ++j)
290                                                 pthread_cancel(_th_ids[j]);
291                                         throw mt_exception("ThreadPool: could not start all specified n_execs");
292                                 }
293                 }
294
295                 void add(Job* job) {
296                         ScopedLock _sl(_list_mtx);
297                         _list_jobs.push_back(job);
298                         _list_sem.pop();
299                 }
300
301                 ~ThreadPool() {
302                         _tp_quit = true;
303                         for (unsigned int i = 0; i < _n_execs; ++i)
304                                 _list_sem.pop();
305                         for (unsigned int i = 0; i < _n_execs; ++i)
306                                 pthread_join(_th_ids[i], NULL);
307                         // potentialy unsafe...this could throw...but should never...
308                         ScopedLock _sl(_list_mtx);
309                         for (std::list<Job*>::iterator it = _list_jobs.begin(); it != _list_jobs.end(); ++it)
310                                 if ((*it)->_to_be_deleted) delete *it;
311                 }
312         };
313 }
314
315 #endif //_MT_H_