I have a WebSocket server built in Go that takes messages from a Kafka topic and pushes it out on a WebSocket connection to a client. This all works well.
However, when I try to add more clients to the WebSocket server on the same endpoint the next message from the Kafka topic is not sent to the new client. My intention is NOT TO broadcast the message to all the connected clients. If I have 100 messages in the Kafka topic, I want to push equal amount of messages to the connected clients. So in this case, 2 clients will get 50 each, 4 clients will get 25 each in a round robin fashion.
How can I achieve this in my code. The code I have written for WebSocket server is pretty standard and I can update this post with it if required. But it seems like this is something basic that I am missing about WebSockets.
Thanks
Nick
Websocket server code. This is not refactored to best practices yet, so please don't judge me :)
import (
"fmt"
"log"
"net/http"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
func secure(w http.ResponseWriter, req *http.Request) {
w.Write([]byte("This is an example of TLS"))
}
func handleWebSocket(w http.ResponseWriter, req *http.Request) {
fmt.Println("Printing headers.............")
for name, values := range req.Header {
for _, value := range values {
fmt.Println(name, value)
}
}
conn, err := upgrader.Upgrade(w, req, nil)
if err != nil {
log.Fatal(err)
}
fmt.Println("Client Connected")
go writer(conn)
reader(conn)
if err != nil {
log.Println(err)
}
}
func writer(conn *websocket.Conn) {
defer func() {
conn.Close()
}()
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost",
"group.id": "myGroup",
"auto.offset.reset": "earliest",
"message.max.bytes": "1000000",
"enable.auto.commit": "true",
})
if err != nil {
fmt.Println(err)
}
c.Subscribe("testTopic", nil)
for {
msg, err := c.ReadMessage(-1)
if err = conn.WriteMessage(websocket.TextMessage, []byte(msg.Value)); err != nil {
log.Println("An error occured writing to the websocket")
return
}
low, high, _ := c.QueryWatermarkOffsets("testTopic", 0, 1000)
fmt.Println("Low and high watermark for the 0th partition is: ", low, high)
if err != nil {
fmt.Println(err)
}
}
}
func reader(conn *websocket.Conn) {
for {
messageType, p, err := conn.ReadMessage()
if err != nil {
log.Println(err)
return
}
fmt.Println(string(p))
if err := conn.WriteMessage(messageType, p); err != nil {
log.Println(err)
return
}
}
}
func main() {
http.HandleFunc("/secure", secure)
http.HandleFunc("/ws", handleWebSocket)
fmt.Println("Started TLS Server....")
err := http.ListenAndServeTLS(":443", "localhost.crt", "localhost.key", nil)
if err != nil {
fmt.Println(err)
}
}
Once I run the above code I simulate a client using
websocat wss://localhost:443/ws
The messages are delivered well to this single instance.
However, when I spin more instances of "websocat" from the same machine they don't get any messages from the websocket unless I disconnect the first one. My expectation was that any messages from Kafka would get load balanced between the number of websocat clients.