Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
146 views
in Technique[技术] by (71.8m points)

c++ - Thread pooling in C++11

Relevant questions:

About C++11:

About Boost:


How do I get a pool of threads to send tasks to, without creating and deleting them over and over again? This means persistent threads to resynchronize without joining.


I have code that looks like this:

namespace {
  std::vector<std::thread> workers;

  int total = 4;
  int arr[4] = {0};

  void each_thread_does(int i) {
    arr[i] += 2;
  }
}

int main(int argc, char *argv[]) {
  for (int i = 0; i < 8; ++i) { // for 8 iterations,
    for (int j = 0; j < 4; ++j) {
      workers.push_back(std::thread(each_thread_does, j));
    }
    for (std::thread &t: workers) {
      if (t.joinable()) {
        t.join();
      }
    }
    arr[4] = std::min_element(arr, arr+4);
  }
  return 0;
}

Instead of creating and joining threads each iteration, I'd prefer to send tasks to my worker threads each iteration and only create them once.

Question&Answers:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

This is copied from my answer to another very similar post:

  1. Start with maximum number of threads the system can support:

    int num_threads = std::thread::hardware_concurrency();
    
  2. For an efficient threadpool implementation, once threads are created according to num_threads, it's better not to create new ones, or destroy old ones (by joining). There will be a performance penalty, an it might even make your application go slower than the serial version.

Each C++11 thread should be running in their function with an infinite loop, constantly waiting for new tasks to grab and run.

Here is how to attach such a function to the thread pool:

int num_threads = std::thread::hardware_concurrency();
std::vector<std::thread> threads;
for (int i = 0; i < num_threads; i++)
{
    pool.push_back(std::thread(Infinite_loop_function));
}
  1. The infinite loop function. This is a while (true) loop waiting for the task queue.
void Pool::Infinite_loop_function()
{
    while (true)
    {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);

            condition.wait(lock, [this](){
                return !queue.empty() || terminate_pool;
            });
            Job = queue.front();
            queue.pop();
        }

        Job(); // function<void()> type
    }
};
  1. Make a function to add job to your queue
void Pool::Add_Job(function<void()> New_Job)
{
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        queue.push(New_Job);
    }

    condition.notify_one();
}
  1. Bind an arbitrary function to your queue
Pool_Obj.Add_Job(std::bind(&Some_Class::Some_Method, &Some_object));

Once you integrate these ingredients, you have your own dynamic threading pool. These threads always run, waiting for job to do.

I apologize if there are some syntax errors, I typed this code and and I have a bad memory. Sorry that I cannot provide you the complete thread pool code; that would violate my job integrity.

Edit: to terminate the pool, call the shutdown() method:

Pool::shutdown()
{
    {
        std::unique_lock<std::mutex> lock(threadpool_mutex);
        terminate_pool = true; // use this flag in condition.wait
    }

    condition.notify_all(); // wake up all threads.

    // Join all threads.
    for (std::thread &th : threads)
    {
        th.join();
    }

    pool.clear();  
    stopped = true; // use this flag in destructor, if not set, call shutdown() 
}

Note: the anonymous code blocks are used so that when they are exited, the std::unique_lock variables created within them go out of scope, unlocking the mutex.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...