Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
490 views
in Technique[技术] by (71.8m points)

c++ - Why is std::queue not thread-safe?

The topic says it. I don't understand why the std::queue (or in general: any queue) is not thread-safe by its nature, when there is no iterator involved as with other datastructures.

According to the common rule that

  • at least one thread is writing to ...
  • and another thread is reading from a shared resource

I should have gotten a conflict in the following example code:

#include "stdafx.h"
#include <queue>
#include <thread>
#include <iostream>

struct response
{
    static int & getCount()
    {
        static int theCount = 0;
        return theCount;
    }

    int id;
};


std::queue<response> queue;

// generate 100 response objects and push them into the queue
void produce()
{
    for (int i = 0; i < 100; i++)
    {
        response r; 
        r.id = response::getCount()++;
        queue.push(r);
        std::cout << "produced: " << r.id << std::endl;
    }
}

// get the 100 first responses from the queue
void consume()
{
    int consumedCounter = 0;
    for (;;)
    {       
        if (!queue.empty())
        {
            std::cout << "consumed: " << queue.front().id << std::endl;
            queue.pop();
            consumedCounter++;
        }

        if (consumedCounter == 100)
            break;
    }
}

int _tmain(int argc, _TCHAR* argv[])
{

    std::thread t1(produce);
    std::thread t2(consume);

    t1.join();
    t2.join();

    return 0;
}

Everything seems to be working fine: - No integrity violated / data corrupted - The order of the elements in which the consumer gets them are correct (0<1<2<3<4...), of course the order in which the prod. and cons. are printing is random as there is no signaling involved.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Imagine you check for !queue.empty(), enter the next block and before getting to access queue.first(), another thread would remove (pop) the one and only element, so you query an empty queue.

Using a synchronized queue like the following

#pragma once

#include <queue>
#include <mutex>
#include <condition_variable>

    template <typename T>
    class SharedQueue
    {
    public:
        SharedQueue();
        ~SharedQueue();

        T& front();
        void pop_front();

        void push_back(const T& item);
        void push_back(T&& item);

        int size();
        bool empty();

    private:
        std::deque<T> queue_;
        std::mutex mutex_;
        std::condition_variable cond_;
    }; 

    template <typename T>
    SharedQueue<T>::SharedQueue(){}

    template <typename T>
    SharedQueue<T>::~SharedQueue(){}

    template <typename T>
    T& SharedQueue<T>::front()
    {
        std::unique_lock<std::mutex> mlock(mutex_);
        while (queue_.empty())
        {
            cond_.wait(mlock);
        }
        return queue_.front();
    }

    template <typename T>
    void SharedQueue<T>::pop_front()
    {
        std::unique_lock<std::mutex> mlock(mutex_);
        while (queue_.empty())
        {
            cond_.wait(mlock);
        }
        queue_.pop_front();
    }     

    template <typename T>
    void SharedQueue<T>::push_back(const T& item)
    {
        std::unique_lock<std::mutex> mlock(mutex_);
        queue_.push_back(item);
        mlock.unlock();     // unlock before notificiation to minimize mutex con
        cond_.notify_one(); // notify one waiting thread

    }

    template <typename T>
    void SharedQueue<T>::push_back(T&& item)
    {
        std::unique_lock<std::mutex> mlock(mutex_);
        queue_.push_back(std::move(item));
        mlock.unlock();     // unlock before notificiation to minimize mutex con
        cond_.notify_one(); // notify one waiting thread

    }

    template <typename T>
    int SharedQueue<T>::size()
    {
        std::unique_lock<std::mutex> mlock(mutex_);
        int size = queue_.size();
        mlock.unlock();
        return size;
    }

The call to front() waits until it has an element and locks the underlying queue so only one thread may access it at a time.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...