Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
240 views
in Technique[技术] by (71.8m points)

How does kafka decide which consumer reads a message within a single consumer group?

I'm wondering if there's any logic which determines which consumer reads a message within the same consumer group. I have a single topic, and a single consumer group. However, I have one or more consumers because there's a consumer deployed in production environment and when I run my application locally another consumer is created which subscribes to the same topic (it's a test project so it's not real production and I'm not worried about loss of data). I noticed that interestingly always the local consumer consumes any given message. So it looks like that the consumer which was created later takes precedence.

Is it possible to configure kafka such that a consumer which was created earlier takes precedence for reads?

My setup includes 3 brokers and 1 consumer group id. In addition this property auto.offset.reset is set to earliest (changin it to latest doesn't resolve the issue). I'm using this Go library for kafka. This is my setup code:

import (
    "log"
    "github.com/confluentinc/confluent-kafka-go/kafka"
)

func getConfig() *kafka.ConfigMap {
    return &kafka.ConfigMap{
        "metadata.broker.list": conf.KafkaBrokers,
        "security.protocol":    "SASL_SSL",
        "sasl.mechanisms":      "SCRAM-SHA-256",
        "sasl.username":        conf.KafkaUsername,
        "sasl.password":        conf.KafkaPassword,
        "group.id":             conf.KafkaGroupID,
        "default.topic.config": kafka.ConfigMap{"auto.offset.reset": "earliest"},
        //"debug":                           "generic,broker,security",
    }
}
question from:https://stackoverflow.com/questions/65640981/how-does-kafka-decide-which-consumer-reads-a-message-within-a-single-consumer-gr

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Within a consumer group, each partition is consumed by a single consumer. When consumers join the group, one of them computes the assignment which consists of the list of partitions each consumer will handle.

In your client, this can be configured via partition.assignment.strategy. This defaults to range which follows the implementation of Apache Kafka's RangeAssignor.

Quoting the Javadoc:

The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order and the consumers in lexicographic order. We then divide the number of partitions by the total number of consumers to determine the number of partitions to assign to each consumer. If it does not evenly divide, then the first few consumers will have one extra partition.

For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.

The assignment will be:

C0: [t0p0, t0p1, t1p0, t1p1]
C1: [t0p2, t1p2]

Consumers are ordered by their member ID which is a generated on the broker side. It is based on the consumer client.id and a random UUID.

In practice, I does not matter which consumer gets assigned each partition so I wouldn't focus too much of that part. Instead it's important to understand how partitions are assigned and identify the strategy that works best for your use cases.

For completeness, confluent-kafka-go also supports other strategies such as: roundrobin and cooperative-sticky.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...