Well. That's really quite simple; You're rejecting the tasks posted!
template< typename Task >
void run_task(task task){
boost::unique_lock<boost::mutex> lock( mutex_ );
if(0 < available_) {
--available_;
io_service_.post(boost::bind(&tpool::wrap_task, this, boost::function< void() > ( task )));
}
}
Note that the lock
"waits" until the mutex is not owned by a thread. This might already be the case, and possibly when available_
is already 0. Now the line
if(0 < available_) {
This line is simply the condition. It's not "magical" because you're holding the mutex_
locked. (The program doesn't even know that a relation exists between mutex_
and available_
). So, if available_ <= 0
you will just skip posting the job.
Solution #1
You should use the io_service
to queue for you. This is likely what you wanted to achieve in the first place. Instead of keeping track of "available" threads, io_service
does the work for you. You control how many threads it may use, by running the io_service
on as many threads. Simple.
Since io_service
is already thread-safe, you can do without the lock.
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <iostream>
// tpool class
// It's always closed. :glasses:
#ifndef __POOL_H
#define __POOL_H
class tpool {
public:
tpool( std::size_t tpool_size );
~tpool();
template<typename Task>
void run_task(Task task){
io_service_.post(task);
}
private:
// note the order of destruction of members
boost::asio::io_service io_service_;
boost::asio::io_service::work work_;
boost::thread_group threads_;
};
extern tpool dbpool;
#endif
#include <boost/asio/io_service.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
//#include "pool.h"
tpool::tpool(std::size_t tpool_size) : work_(io_service_) {
for (std::size_t i = 0; i < tpool_size; ++i)
{
threads_.create_thread(
boost::bind(&boost::asio::io_service::run, &io_service_)
);
}
}
tpool::~tpool() {
io_service_.stop();
try {
threads_.join_all();
}
catch(...) {}
}
void foo() { std::cout << __PRETTY_FUNCTION__ << "
"; }
void bar() { std::cout << __PRETTY_FUNCTION__ << "
"; }
int main() {
tpool dbpool(50);
dbpool.run_task(foo);
dbpool.run_task(bar);
boost::this_thread::sleep_for(boost::chrono::seconds(1));
}
For shutdown purposes, you will want to enable "clearing" the io_service::work
object, otherwise your pool will never exit.
Solution #2
Don't use io_service
, instead roll your own queue implementation with a condition variable to notify a worker thread of new work being posted. Again, the number of workers is determined by the number of threads in the group.
#include <boost/thread.hpp>
#include <boost/phoenix.hpp>
#include <boost/optional.hpp>
using namespace boost;
using namespace boost::phoenix::arg_names;
class thread_pool
{
private:
mutex mx;
condition_variable cv;
typedef function<void()> job_t;
std::deque<job_t> _queue;
thread_group pool;
boost::atomic_bool shutdown;
static void worker_thread(thread_pool& q)
{
while (auto job = q.dequeue())
(*job)();
}
public:
thread_pool() : shutdown(false) {
for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
pool.create_thread(bind(worker_thread, ref(*this)));
}
void enqueue(job_t job)
{
lock_guard<mutex> lk(mx);
_queue.push_back(std::move(job));
cv.notify_one();
}
optional<job_t> dequeue()
{
unique_lock<mutex> lk(mx);
namespace phx = boost::phoenix;
cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));
if (_queue.empty())
return none;
auto job = std::move(_queue.front());
_queue.pop_front();
return std::move(job);
}
~thread_pool()
{
shutdown = true;
{
lock_guard<mutex> lk(mx);
cv.notify_all();
}
pool.join_all();
}
};
void the_work(int id)
{
std::cout << "worker " << id << " entered
";
// no more synchronization; the pool size determines max concurrency
std::cout << "worker " << id << " start work
";
this_thread::sleep_for(chrono::seconds(2));
std::cout << "worker " << id << " done
";
}
int main()
{
thread_pool pool; // uses 1 thread per core
for (int i = 0; i < 10; ++i)
pool.enqueue(bind(the_work, i));
}
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…