Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
373 views
in Technique[技术] by (71.8m points)

concurrency - Best way to implement global counters for highly concurrent applications?

What is the best way to implement global counters for a highly concurrent application? In my case I may have 10K-20K go routines performing "work", and I want to count the number and types of items that the routines are working on collectively...

The "classic" synchronous coding style would look like:

var work_counter int

func GoWorkerRoutine() {
    for {
        // do work
        atomic.AddInt32(&work_counter,1)
    }    
}

Now this gets more complicated because I want to track the "type" of work being done, so really I'd need something like this:

var work_counter map[string]int
var work_mux sync.Mutex

func GoWorkerRoutine() {
    for {
        // do work
        work_mux.Lock()
        work_counter["type1"]++
        work_mux.Unlock()
    }    
}

It seems like there should be a "go" optimized way using channels or something similar to this:

var work_counter int
var work_chan chan int // make() called somewhere else (buffered)

// started somewher else
func GoCounterRoutine() {
    for {
        select {
            case c := <- work_chan:
                work_counter += c
                break
        }
    }
}

func GoWorkerRoutine() {
    for {
        // do work
        work_chan <- 1
    }    
}

This last example is still missing the map, but that's easy enough to add. Will this style provide better performance than just a simple atomic increment? I can't tell if this is more or less complicated when we're talking about concurrent access to a global value versus something that may block on I/O to complete...

Thoughts are appreciated.

Update 5/28/2013:

I tested a couple implementations, and the results were not what I expected, here's my counter source code:

package helpers

import (
)

type CounterIncrementStruct struct {
    bucket string
    value int
}

type CounterQueryStruct struct {
    bucket string
    channel chan int
}

var counter map[string]int
var counterIncrementChan chan CounterIncrementStruct
var counterQueryChan chan CounterQueryStruct
var counterListChan chan chan map[string]int

func CounterInitialize() {
    counter = make(map[string]int)
    counterIncrementChan = make(chan CounterIncrementStruct,0)
    counterQueryChan = make(chan CounterQueryStruct,100)
    counterListChan = make(chan chan map[string]int,100)
    go goCounterWriter()
}

func goCounterWriter() {
    for {
        select {
            case ci := <- counterIncrementChan:
                if len(ci.bucket)==0 { return }
                counter[ci.bucket]+=ci.value
                break
            case cq := <- counterQueryChan:
                val,found:=counter[cq.bucket]
                if found {
                    cq.channel <- val
                } else {
                    cq.channel <- -1    
                }
                break
            case cl := <- counterListChan:
                nm := make(map[string]int)
                for k, v := range counter {
                    nm[k] = v
                }
                cl <- nm
                break
        }
    }
}

func CounterIncrement(bucket string, counter int) {
    if len(bucket)==0 || counter==0 { return }
    counterIncrementChan <- CounterIncrementStruct{bucket,counter}
}

func CounterQuery(bucket string) int {
    if len(bucket)==0 { return -1 }
    reply := make(chan int)
    counterQueryChan <- CounterQueryStruct{bucket,reply}
    return <- reply
}

func CounterList() map[string]int {
    reply := make(chan map[string]int)
    counterListChan <- reply
    return <- reply
}

It uses channels for both writes and reads which seems logical.

Here are my test cases:

func bcRoutine(b *testing.B,e chan bool) {
    for i := 0; i < b.N; i++ {
        CounterIncrement("abc123",5)
        CounterIncrement("def456",5)
        CounterIncrement("ghi789",5)
        CounterIncrement("abc123",5)
        CounterIncrement("def456",5)
        CounterIncrement("ghi789",5)
    }
    e<-true
}

func BenchmarkChannels(b *testing.B) {
    b.StopTimer()

    CounterInitialize()
    e:=make(chan bool)
    b.StartTimer()

    go bcRoutine(b,e)
    go bcRoutine(b,e)
    go bcRoutine(b,e)
    go bcRoutine(b,e)
    go bcRoutine(b,e)

    <-e
    <-e
    <-e
    <-e
    <-e

}

var mux sync.Mutex
var m map[string]int
func bmIncrement(bucket string, value int) {
    mux.Lock()
    m[bucket]+=value
    mux.Unlock()
}

func bmRoutine(b *testing.B,e chan bool) {
    for i := 0; i < b.N; i++ {
        bmIncrement("abc123",5)
        bmIncrement("def456",5)
        bmIncrement("ghi789",5)
        bmIncrement("abc123",5)
        bmIncrement("def456",5)
        bmIncrement("ghi789",5)
    }
    e<-true
}

func BenchmarkMutex(b *testing.B) {
    b.StopTimer()
    m=make(map[string]int)
    e:=make(chan bool)
    b.StartTimer()

    for i := 0; i < b.N; i++ {
        bmIncrement("abc123",5)
        bmIncrement("def456",5)
        bmIncrement("ghi789",5)
        bmIncrement("abc123",5)
        bmIncrement("def456",5)
        bmIncrement("ghi789",5)
    }

    go bmRoutine(b,e)
    go bmRoutine(b,e)
    go bmRoutine(b,e)
    go bmRoutine(b,e)
    go bmRoutine(b,e)

    <-e
    <-e
    <-e
    <-e
    <-e

}

I implemented a simple benchmark with just a mutex around the map (just testing writes), and benchmarked both with 5 goroutines running in parallel. Here are the results:

$ go test --bench=. helpers
PASS
BenchmarkChannels         100000             15560 ns/op
BenchmarkMutex   1000000              2669 ns/op
ok      helpers 4.452s

I would not have expected the mutex to be that much faster...

Further thoughts?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...