Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
195 views
in Technique[技术] by (71.8m points)

c++ - Implementation of a lock free vector

After several searches, I cannot find a lock-free vector implementation. There is a document that speaks about it but nothing concrete (in any case I have not found it). http://pirkelbauer.com/papers/opodis06.pdf

There are currently 2 threads dealing with arrays, there may be more in a while.

One thread that updates different vectors and another thread that accesses the vector to do calculations, etc. Each thread accesses the different array a large number of times per second.

I implemented a lock with mutex on the different vectors but when the reading or writing thread takes too long to unlock, all further updates are delayed. I then thought of copying the array all the time to go faster, but copying thousands of times per second an array of thousands of elements doesn't seem great to me.

So I thought to use 1 mutex per value in each table to lock only the value I am working on.

A lock-free could be better but I can not find a solution and I wonder if the performances would be really better.

EDIT:

I have a thread that receives data and ranges in vectors. When I instantiate the structure, I use a fixed size.

I have to do 2 different things for the updates:

-Update vector elements. (1d vector which simulates a 2d vector)

-Add a line at the end of the vector and remove the first line. The array always remains sorted. Adding elements is much much rarer than updating

The thread that is read-only walks the array and performs calculations. To limit the time spent on the array and do as little calculation as possible, I use arrays that store the result of my calculations. Despite this, I often have to scan the table enough to do new calculations or just update them. (the application is in real-time so the calculations to be made vary according to the requests)

When a new element is added to the vector, the reading thread must directly use it to update the calculations.

When I say calculation, it is not necessarily only arithmetic, it is more a treatment to be done.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

There is no perfect implementation to run concurrency, each task has it's own good enogh. My goto method to find a decent implementation is to only alow what is needed and then check if i would need somthing more in the future. You described a quite simple scenario, one thread one accion to a shared vector, then the vector needs to tell if the acction is alowed soo std::atomic_flag is good enogh.

This example shuld give you an idea on how it works and what to expent. Mainly i just attached a flag to eatch array and checkt it before to see if is safe to do somthing and some people like to add a guard to the flag, just in case.

#include <iostream>
#include <thread>
#include <atomic>
#include <chrono>

const int vector_size = 1024;

struct Element {
    void some_yield(){
        std::this_thread::yield();
    };
    void some_wait(){
        std::this_thread::sleep_for(
            std::chrono::microseconds(1)
        );
    };
};

Element ** data;
std::atomic_flag * vector_safe;


bool alive = true;
uint32_t c_down_time = 0;
uint32_t p_down_time = 0;
uint32_t c_intinerations = 0;
uint32_t p_intinerations = 0;
std::chrono::high_resolution_clock::time_point c_time_point;
std::chrono::high_resolution_clock::time_point p_time_point;

int simple_consumer_work(){
    Element a_read;
    uint16_t i, e;
    while (alive){
        // Loops thru the vectors
        for (i=0; i < vector_size; i++){
            // locks the thread untin the vector 
            // at index i is free to read
            while (!vector_safe[i].test_and_set()){} 
                // Do the watherver
                for (e=0; e < vector_size; e++){
                    a_read = data[i][e];
                } 
            // And signal that this vector is done
            vector_safe[i].clear();
        }
    }
    return 0;
};
int simple_producer_work(){
    uint16_t i;
    while (alive){
        for (i=0; i < vector_size; i++){
            while (!vector_safe[i].test_and_set()){} 
            data[i][i].some_wait();
            vector_safe[i].clear();
        }
        p_intinerations++;
    }
    return 0;
};

int consumer_work(){
    Element a_read;
    uint16_t i, e;
    bool waiting;
    while (alive){

        for (i=0; i < vector_size; i++){
            waiting = false;
            c_time_point = std::chrono::high_resolution_clock::now();
            while (!vector_safe[i].test_and_set(std::memory_order_acquire)){
                waiting = true;
            } 
            if (waiting){
                c_down_time += (uint32_t)std::chrono::duration_cast<std::chrono::nanoseconds> 
                (std::chrono::high_resolution_clock::now() - c_time_point).count();
            }  
            for (e=0; e < vector_size; e++){
                a_read = data[i][e];
            } 
            vector_safe[i].clear(std::memory_order_release);
        }
        c_intinerations++;
    }
    return 0;
};
int producer_work(){
    bool waiting;
    uint16_t i;
    while (alive){
        for (i=0; i < vector_size; i++){
            waiting = false;
            p_time_point = std::chrono::high_resolution_clock::now();
            while (!vector_safe[i].test_and_set(std::memory_order_acquire)){
                waiting = true;
            } 
            if (waiting){
                p_down_time += (uint32_t)std::chrono::duration_cast<std::chrono::nanoseconds> 
                (std::chrono::high_resolution_clock::now() - p_time_point).count();
            } 
            data[i][i].some_wait();
            vector_safe[i].clear(std::memory_order_release);
        }
        p_intinerations++;
    }
    return 0;
};

void print_time(uint32_t down_time){
    if ( down_time <= 1000) {
        std::cout << down_time << " [nanosecods] 
";

    } else if (down_time <= 1000000) {
        std::cout << down_time / 1000 << " [microseconds] 
";
    
    } else if (down_time <= 1000000000) {
        std::cout << down_time / 1000000 << " [miliseconds] 
";
    
    } else {
        std::cout << down_time / 1000000000 << " [seconds] 
";
    }
};

int main(){

    std::uint16_t i;
    std::thread consumer;
    std::thread producer;

    vector_safe = new std::atomic_flag [vector_size] {ATOMIC_FLAG_INIT};
    data = new Element * [vector_size];
    for(i=0; i < vector_size; i++){
        data[i] = new Element;
    }

    consumer = std::thread(consumer_work);
    producer = std::thread(producer_work);

    std::this_thread::sleep_for(
        std::chrono::seconds(10)
    );

    alive = false;
    producer.join();
    consumer.join();

    std::cout << " Consumer loops > " << c_intinerations << std::endl;
    std::cout << " Consumer time lost > "; print_time(c_down_time);
    std::cout << " Producer loops > " << p_intinerations << std::endl;
    std::cout << " Producer time lost > "; print_time(p_down_time);

    for(i=0; i < vector_size; i++){
        delete data[i];
    }
    delete [] vector_safe;
    delete [] data;

    return 0;
}

And dont forget that the compiler can and will change portions of the code, spagueti code is realy realy buggy in multithreading.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...