Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
497 views
in Technique[技术] by (71.8m points)

java - Kafka only subscribe to latest message?

Sometimes(seems very random) Kafka sends old messages. I only want the latest messages so it overwrite messages with the same key. Currently it looks like I have multiple messages with the same key it doesn't get compacted.

I use this setting in the topic:

cleanup.policy=compact

I'm using Java/Kotlin and Apache Kafka 1.1.1 client.

Properties(8).apply {
    val jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username="%s" password="%s";"
    val jaasCfg = String.format(jaasTemplate, Configuration.kafkaUsername, Configuration.kafkaPassword)
    put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            BOOTSTRAP_SERVERS)
    put(ConsumerConfig.GROUP_ID_CONFIG,
            "ApiKafkaKotlinConsumer${Configuration.kafkaGroupId}")
    put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer::class.java.name)
    put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer::class.java.name)

    put("security.protocol", "SASL_SSL")
    put("sasl.mechanism", "SCRAM-SHA-256")
    put("sasl.jaas.config", jaasCfg)
    put("max.poll.records", 100)
    put("receive.buffer.bytes", 1000000)
}

Have I missed some settings?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

If you want have only one value for each key, you have to use KTable<K,V> abstraction: StreamsBuilder::table(final String topic) from Kafka Streams. Topic used here should have cleanup policy set to compact.

If you use KafkaConsumer you just pull data from brokers. It doesn't give you any mechanism that perform some kind of deduplication. Depending on if compaction was performed or not, you can get one to n messages for same key.

Regarding compaction

Compaction doesn't mean, that all old value for same key are removed immediately. When old message for same key will be removed, depends on several properties. The most important are:

  • log.cleaner.min.cleanable.ratio

The minimum ratio of dirty log to total log for a log to eligible for cleaning

  • log.cleaner.min.compaction.lag.ms

The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted.

  • log.cleaner.enable

Enable the log cleaner process to run on the server. Should be enabled if using any topics with a cleanup.policy=compact including the internal offsets topic. If disabled those topics will not be compacted and continually grow in size.

More detail about compaction you can find https://kafka.apache.org/documentation/#compaction


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...