EVHttpServer  1.0.0
A Lightweight http server base libevent and threadpool
ThreadPool.h
1 #ifndef THREAD_POOL_H
2 #define THREAD_POOL_H
3 
4 #include <vector>
5 #include <queue>
6 #include <memory>
7 #include <thread>
8 #include <mutex>
9 #include <condition_variable>
10 #include <future>
11 #include <functional>
12 #include <stdexcept>
13 
14 class ThreadPool {
15 public:
16  ThreadPool(size_t);
17  template<class F, class... Args>
18  auto enqueue(F&& f, Args&&... args)
19  -> std::future<typename std::result_of<F(Args...)>::type>;
20  ~ThreadPool();
21 private:
22  // need to keep track of threads so we can join them
23  std::vector< std::thread > workers;
24  // the task queue
25  std::queue< std::function<void()> > tasks;
26 
27  // synchronization
28  std::mutex queue_mutex;
29  std::condition_variable condition;
30  bool stop;
31 };
32 
33 // the constructor just launches some amount of workers
34 inline ThreadPool::ThreadPool(size_t threads)
35  : stop(false)
36 {
37  for(size_t i = 0;i<threads;++i)
38  workers.emplace_back(
39  [this]
40  {
41  for(;;)
42  {
43  std::function<void()> task;
44 
45  {
46  std::unique_lock<std::mutex> lock(this->queue_mutex);
47  this->condition.wait(lock,
48  [this]{ return this->stop || !this->tasks.empty(); });
49  if(this->stop && this->tasks.empty())
50  return;
51  task = std::move(this->tasks.front());
52  this->tasks.pop();
53  }
54 
55  task();
56  }
57  }
58  );
59 }
60 
61 // add new work item to the pool
62 template<class F, class... Args>
63 auto ThreadPool::enqueue(F&& f, Args&&... args)
64  -> std::future<typename std::result_of<F(Args...)>::type>
65 {
66  using return_type = typename std::result_of<F(Args...)>::type;
67 
68  auto task = std::make_shared< std::packaged_task<return_type()> >(
69  std::bind(std::forward<F>(f), std::forward<Args>(args)...)
70  );
71 
72  std::future<return_type> res = task->get_future();
73  {
74  std::unique_lock<std::mutex> lock(queue_mutex);
75 
76  // don't allow enqueueing after stopping the pool
77  if(stop)
78  throw std::runtime_error("enqueue on stopped ThreadPool");
79 
80  tasks.emplace([task](){ (*task)(); });
81  }
82  condition.notify_one();
83  return res;
84 }
85 
86 // the destructor joins all threads
87 inline ThreadPool::~ThreadPool()
88 {
89  {
90  std::unique_lock<std::mutex> lock(queue_mutex);
91  stop = true;
92  }
93  condition.notify_all();
94  for(std::thread &worker: workers)
95  worker.join();
96 }
97 
98 #endif
Definition: ThreadPool.h:14