2 * qpsnr (C) 2010 E. Oriani, ema <AT> fastwebnet <DOT> it
4 * This file is part of qpsnr.
6 * qpsnr is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
11 * qpsnr is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with qpsnr. If not, see <http://www.gnu.org/licenses/>.
21 * myNZB (C) 2009 E. Oriani, ema <AT> fastwebnet <DOT> it
23 * This file is part of myNZB.
25 * myNZB is free software: you can redistribute it and/or modify
26 * it under the terms of the GNU General Public License as published by
27 * the Free Software Foundation, either version 3 of the License, or
28 * (at your option) any later version.
30 * myNZB is distributed in the hope that it will be useful,
31 * but WITHOUT ANY WARRANTY; without even the implied warranty of
32 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
33 * GNU General Public License for more details.
35 * You should have received a copy of the GNU General Public License
36 * along with myNZB. If not, see <http://www.gnu.org/licenses/>.
43 #include <semaphore.h>
52 class mt_exception : public std::exception {
53 const std::string _what;
55 mt_exception(const std::string& what) : _what(what) {
58 virtual const char* what() const throw() {
62 ~mt_exception() throw () {
69 Semaphore(const Semaphore&);
70 Semaphore& operator=(const Semaphore&);
73 if (0 != sem_init(&_sem, 0, 1))
74 throw mt_exception("Semaphore: Semaphore()");
78 if (0 != sem_wait(&_sem))
79 throw mt_exception("Sempahore: push");
83 if (0 == sem_trywait(&_sem)) {
85 } else if (errno == EAGAIN) {
88 throw mt_exception("Semaphore: trywait");
92 if (0 != sem_post(&_sem))
93 throw mt_exception("Sempahore: pop");
102 pthread_mutex_t _mtx;
105 Mutex& operator=(const Mutex&);
108 if (0 != pthread_mutex_init(&_mtx, NULL))
109 throw mt_exception("Mutex: Mutex()");
113 if (0 != pthread_mutex_lock(&_mtx))
114 throw mt_exception("Mutex: lock");
118 if (0 != pthread_mutex_unlock(&_mtx))
119 throw mt_exception("Mutex: unlock");
123 pthread_mutex_destroy(&_mtx);
130 ScopedLock(const ScopedLock&);
131 ScopedLock& operator=(const ScopedLock&);
133 ScopedLock(Mutex& mtx) : _mtx(mtx) {
145 static void* exec(void* par) throw() {
147 Thread *p = (Thread*)par;
154 Thread(const Thread&);
155 Thread& operator=(const Thread&);
160 virtual void run(void) = 0;
163 if (0 != pthread_create(&_th, NULL, &exec, this))
164 throw mt_exception("Thread: start");
168 if (0!=_th) pthread_join(_th, NULL);
178 friend class ThreadPool;
180 volatile bool _on_hold;
181 const bool _to_be_deleted;
183 Job(const bool& to_be_deleted = false) : _on_hold(true), _to_be_deleted(to_be_deleted) {
187 virtual void run(void) = 0;
189 /* Just rememeber that
191 - bool is_running(void)
192 can only be used when the ownership of the
193 ThreadPool::Job instance is external (eg.
194 to_be_deleted = false).
195 Otherwise this will lead to a crash!
198 // We pop the semaphore again in case someone else
199 // will ask to wait again...
205 bool is_running(void) {
206 if (_on_hold) return false;
207 if (false == _sem.trypush())
219 std::list<Job*> _list_jobs;
221 // the following variable is not mutex
222 // protected because is sort of write-only
223 // by main ThreadPool thread, see destructor
224 volatile bool _tp_quit;
225 const unsigned int _n_execs;
226 std::vector<pthread_t> _th_ids;
228 bool get_job(Job** _job) {
229 ScopedLock _sl(_list_mtx);
230 if (!_list_jobs.empty()) {
231 *_job = _list_jobs.front();
232 _list_jobs.pop_front();
238 // Technically we should declare it as extern "C" but we
239 // don't care as seen as the function pointer will correspond
240 // to a proper C-like function even if the name will be C++ like
241 static void* job_exec(void* par) throw() {
243 ThreadPool *p = (ThreadPool*)par;
245 // first push on the semaphore
247 // check if we have to quit
248 if (p->_tp_quit) return 0;
249 // get an element to process
251 if (p->get_job(&curJob)) {
252 // we need to save the delete_job varibale because
253 // after the semaphore has been popped the object
254 // could not exist anymore
255 const bool delete_job = curJob->_to_be_deleted;
257 curJob->_on_hold = false;
261 // if the following instruction throws is better
262 // to let the user know because this means that
263 // the semaphore is not valid anymore...this should
266 // if it has to be deleted do it!
267 if (delete_job) delete curJob;
269 // check if we have to quit
270 if (p->_tp_quit) return 0;
277 ThreadPool(const ThreadPool&);
278 ThreadPool& operator=(const ThreadPool&);
280 // Just take into account that semaphores are not syscall immune
281 // so when you run in debug mode you can have exceptions thrown on
282 // push because of system interrruption! Don't get scared!
283 ThreadPool(const unsigned int& n_execs) : _n_execs(n_execs), _th_ids(n_execs), _tp_quit(false) {
284 if (_n_execs == 0 || _n_execs > 256)
285 throw mt_exception("ThreadPool: invalid number of n_execs");
286 // create the job_exec threads
287 for (unsigned int i = 0; i < _n_execs; ++i)
288 if (0 != pthread_create(&_th_ids[i], NULL, &job_exec, this)) {
289 for (unsigned int j=0; j < i; ++j)
290 pthread_cancel(_th_ids[j]);
291 throw mt_exception("ThreadPool: could not start all specified n_execs");
296 ScopedLock _sl(_list_mtx);
297 _list_jobs.push_back(job);
303 for (unsigned int i = 0; i < _n_execs; ++i)
305 for (unsigned int i = 0; i < _n_execs; ++i)
306 pthread_join(_th_ids[i], NULL);
307 // potentialy unsafe...this could throw...but should never...
308 ScopedLock _sl(_list_mtx);
309 for (std::list<Job*>::iterator it = _list_jobs.begin(); it != _list_jobs.end(); ++it)
310 if ((*it)->_to_be_deleted) delete *it;