See More

/////////////////////////////////////////////////////////////////////////////// // // Copyright (C) 2008-2012 Artyom Beilis (Tonkikh) // // See accompanying file COPYING.TXT file for licensing details. // /////////////////////////////////////////////////////////////////////////////// #define CPPCMS_SOURCE #include #include #include #include #include #include #include #include #include #include #if defined(CPPCMS_POSIX) #include #endif namespace cppcms { namespace impl { class thread_pool : public booster::noncopyable { public: bool cancel(int id) { booster::unique_lock<:mutex> lock(mutex_); queue_type::iterator p; for(p=queue_.begin();p!=queue_.end();++p) { if(p->first==id) { queue_.erase(p); return true; } } return false; } int post(booster::function const &job) { booster::unique_lock<:mutex> lock(mutex_); int id=job_id_++; queue_.push_back(std::make_pair(id,job)); cond_.notify_one(); return id; } thread_pool(int threads) : shut_down_(false), job_id_(0) { workers_.resize(threads); #if defined(CPPCMS_POSIX) sigset_t set,old; sigfillset(&set); pthread_sigmask(SIG_BLOCK,&set,&old); #endif for(int i=0;i lock(mutex_); shut_down_=true; cond_.notify_all(); } for(unsigned i=0;i thread=workers_[i]; workers_[i].reset(); if(thread) thread->join(); } } ~thread_pool() { try { stop(); } catch(...) { } } private: void worker() { for(;;) { booster::function job; { booster::unique_lock<:mutex> lock(mutex_); if(shut_down_) return; if(!queue_.empty()) { queue_.front().second.swap(job); queue_.pop_front(); } else { cond_.wait(lock); } } if(job) { try { job(); } catch(std::exception const &e) { BOOSTER_ERROR("cppcms") << "Catched exception in thread pool" << e.what() <<'\n' << booster::trace(e); } catch(...) { BOOSTER_ERROR("cppcms") << "Catched unknown exception in thread pool"; } } } } booster::mutex mutex_; booster::condition_variable cond_; bool shut_down_; int job_id_; typedef std::list<:pair> > > queue_type; queue_type queue_; std::vector<:shared_ptr> > workers_; }; } thread_pool::thread_pool(int n) : impl_(new impl::thread_pool(n)) { } int thread_pool::post(booster::function const &job) { return impl_->post(job); } void thread_pool::stop() { impl_->stop(); } bool thread_pool::cancel(int id) { return impl_->cancel(id); } thread_pool::~thread_pool() { } } // cppcms