Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
852 views
in Technique[技术] by (71.8m points)

go - How to perform load balancing using WebSockets

I have a WebSocket server built in Go that takes messages from a Kafka topic and pushes it out on a WebSocket connection to a client. This all works well.

However, when I try to add more clients to the WebSocket server on the same endpoint the next message from the Kafka topic is not sent to the new client. My intention is NOT TO broadcast the message to all the connected clients. If I have 100 messages in the Kafka topic, I want to push equal amount of messages to the connected clients. So in this case, 2 clients will get 50 each, 4 clients will get 25 each in a round robin fashion.

How can I achieve this in my code. The code I have written for WebSocket server is pretty standard and I can update this post with it if required. But it seems like this is something basic that I am missing about WebSockets.

Thanks Nick

Websocket server code. This is not refactored to best practices yet, so please don't judge me :)



import (
    "fmt"
    "log"
    "net/http"

    "github.com/confluentinc/confluent-kafka-go/kafka"
    "github.com/gorilla/websocket"
)

var upgrader = websocket.Upgrader{
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
}

func secure(w http.ResponseWriter, req *http.Request) {
    w.Write([]byte("This is an example of TLS"))
}

func handleWebSocket(w http.ResponseWriter, req *http.Request) {

    fmt.Println("Printing headers.............")
    for name, values := range req.Header {
        for _, value := range values {
            fmt.Println(name, value)
        }
    }
    conn, err := upgrader.Upgrade(w, req, nil)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println("Client Connected")
    go writer(conn)
    reader(conn)

    if err != nil {
        log.Println(err)
    }
}

func writer(conn *websocket.Conn) {
    defer func() {
        conn.Close()
    }()

    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers":  "localhost",
        "group.id":           "myGroup",
        "auto.offset.reset":  "earliest",
        "message.max.bytes":  "1000000",
        "enable.auto.commit": "true",
    })

    if err != nil {
        fmt.Println(err)
    }

    c.Subscribe("testTopic", nil)

    for {

        msg, err := c.ReadMessage(-1)
        if err = conn.WriteMessage(websocket.TextMessage, []byte(msg.Value)); err != nil {
            log.Println("An error occured writing to the websocket")
            return
        }
        low, high, _ := c.QueryWatermarkOffsets("testTopic", 0, 1000)
        fmt.Println("Low and high watermark for the 0th partition is: ", low, high)


        if err != nil {
            fmt.Println(err)
        }
    }
}

func reader(conn *websocket.Conn) {
    for {

        messageType, p, err := conn.ReadMessage()
        if err != nil {
            log.Println(err)
            return
        }

        fmt.Println(string(p))

        if err := conn.WriteMessage(messageType, p); err != nil {
            log.Println(err)
            return
        }
    }
}

func main() {

    http.HandleFunc("/secure", secure)
    http.HandleFunc("/ws", handleWebSocket)
    fmt.Println("Started TLS Server....")
    err := http.ListenAndServeTLS(":443", "localhost.crt", "localhost.key", nil)
    if err != nil {
        fmt.Println(err)
    }
}


Once I run the above code I simulate a client using
websocat wss://localhost:443/ws

The messages are delivered well to this single instance. 

However, when I spin more instances of "websocat" from the same machine they don't get any messages from the websocket unless I disconnect the first one. My expectation was that any messages from Kafka would get load balanced between the number of websocat clients. 

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
等待大神答复

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...