Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.1k views
in Technique[技术] by (71.8m points)

java - Spring kafka - Failed to construct kafka consumer

I'm having an issue with getting my Kafka / confluent spring boot with gradle project up and running. I originally had just a producer in this test project and everything was running well. I then added a Kafka consumer and now I get an exception on start up. Would anyone be able to spot the problem here:

Firstly this is the stacktrace

2021-01-22 19:56:08.566  WARN 61123 --- [           main] ConfigServletWebServerApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
2021-01-22 19:56:08.573  INFO 61123 --- [           main] j.LocalContainerEntityManagerFactoryBean : Closing JPA EntityManagerFactory for persistence unit 'default'
2021-01-22 19:56:08.575  INFO 61123 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'
2021-01-22 19:56:08.576  INFO 61123 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2021-01-22 19:56:08.584  INFO 61123 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.
2021-01-22 19:56:08.586  INFO 61123 --- [           main] o.apache.catalina.core.StandardService   : Stopping service [Tomcat]
2021-01-22 19:56:08.597  INFO 61123 --- [           main] ConditionEvaluationReportLoggingListener : 

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2021-01-22 19:36:06.216 ERROR 61013 --- [           main] o.s.boot.SpringApplication               : Application run failed

org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185)
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:895)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:554)
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143)
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758)
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750)
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
    at com.test.DevlyAuthServiceApplication.main(DevlyAuthServiceApplication.java:39)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:825)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:358)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:326)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:302)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:269)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:243)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:639)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:305)
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338)
    at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:204)
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338)
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312)
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257)
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
    ... 14 common frames omitted
Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/common/requests/IsolationLevel
    at io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor.configure(MonitoringConsumerInterceptor.java:46)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:376)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:436)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:417)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:404)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:711)
    ... 28 common frames omitted
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.requests.IsolationLevel
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:602)
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
    ... 34 common frames omitted

This is my build.gradle:

plugins {
    id 'org.springframework.boot' version '2.3.2.RELEASE'
    id 'io.spring.dependency-management' version '1.0.8.RELEASE'
    id 'java'
    id "com.commercehub.gradle.plugin.avro" version "0.21.0"
    id "idea"

}
group 'com.test.tge-auth-service'
version '1.0'

java {
    sourceCompatibility = JavaVersion.VERSION_14
    targetCompatibility = JavaVersion.VERSION_14
}

ext {
    avroVersion = "1.10.1"
}

repositories {
    mavenCentral()
    jcenter()
    maven {
        url "https://packages.confluent.io/maven/"
    }
}

avro {
    createSetters = true
    fieldVisibility = "PRIVATE"
}

dependencies {
//    providedRuntime 'org.springframework.boot:spring-boot-starter-tomcat'

    compile group: 'co.elastic.logging', name: 'logback-ecs-encoder', version: '0.5.2'
    compile group: 'com.amazonaws', name: 'aws-java-sdk', version: '1.11.860'

    compile group: 'io.springfox', name: 'springfox-swagger-ui', version: '3.0.0'
    compile group: 'io.springfox', name: 'springfox-boot-starter', version: '3.0.0'

    compile('org.springframework.boot:spring-boot-starter-data-elasticsearch')
    compile group: 'org.springframework.boot', name: 'spring-boot-starter-data-mongodb', version: '2.3.3.RELEASE'
    compile group: 'org.springframework.data', name: 'spring-data-elasticsearch', version: '4.0.4.RELEASE'
    compile group: 'org.springframework.boot', name: 'spring-boot-starter-data-jpa', version: '2.3.3.RELEASE'
    compile group: 'org.springframework.boot', name: 'spring-boot-starter-security', version: '2.3.3.RELEASE'
    compile group: 'org.springframework.security', name: 'spring-security-oauth2-client', version: '5.4.0'
    compile group: 'org.springframework.boot', name: 'spring-boot-starter-validation', version: '2.4.2'
    compile group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.6.5'

    compile group: 'javax.validation', name: 'validation-api', version: '2.0.1.Final'
    compile group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: '2.11.2'

    compile group: 'mysql', name: 'mysql-connector-java', version: '8.0.21'

    compile group: 'io.jsonwebtoken', name: 'jjwt', version: '0.9.1'
    compile group: 'org.openapitools', name: 'jackson-databind-nullable', version: '0.2.1'

    compile group: 'commons-io', name: 'commons-io', version: '2.6'
    compile group: 'org.apache.commons', name: 'commons-collections4', version: '4.4'
    compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.11'
    compile group: 'org.passay', name: 'passay', version: '1.6.0'
    compile group: 'com.google.guava', name: 'guava', version: '30.0-jre'

    compile group: 'io.confluent', name: 'kafka-schema-registry-client', version: '5.4.0'
    compile group: 'io.confluent', name: 'kafka-avro-serializer', version: '5.4.0'
    compile group: 'io.confluent', name: 'monitoring-interceptors', version: '5.4.0'
    compile(group: 'io.confluent', name: 'kafka-streams-avro-serde', version:'5.4.0') {
        exclude(module: 'log4j-over-slf4j')
    }

    compile "org.apache.avro:avro:1.10.1"
    implementation "org.apache.avro:avro:${avroVersion}"

    compileOnly 'org.projectlombok:lombok:1.18.12'
    annotationProcessor 'org.projectlombok:lombok:1.18.12'

    implementation 'com.amazonaws:aws-java-sdk-s3'
    implementation 'org.springframework.boot:spring-boot-starter-web'

    testCompile group: 'junit', name: 'junit', version: '4.12'
    testCompileOnly 'org.projectlombok:lombok:1.18.12'
    testAnnotationProcessor 'org.projectlombok:lombok:1.18.12'

    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }

    jar {
        manifest {
            attributes(
                    'Main-Class': 'com.test.SpringBootPersistenceApplication'
            )
        }
        from {
            configurations.runtimeClasspath.collect { it.isDirectory() ? it : zipTree(it) }
        }
    }
}

Here is my producer which works:

import com.test.messages.avro.model.User;
import lombok.RequiredArgsConstructor;
import lombok.extern.apachecommons.CommonsLog;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@CommonsLog(topic = "Producer Logger")
@RequiredArgsConstructor
public class ProducerK {

  @Value("${topic.name}")
  private String TOPIC;

  private final KafkaTemplate<String, User> kafkaTemplate;

  void sendMessage(User user) {
    this.kafkaTemplate.send(this.TOPIC, "key", user);
    log.info(String.format("Produced user -> %s", user));
  }
}

And finally here is my Consumer:

import com.test.messages.avro.model.User;
import lombok.extern.apachecommons.CommonsLog;
i

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Boot 2.3 uses spring-kafka 2.5 by default (and kafka-clients 2.5.0); since you have overridden its prescribed spring-kafka version to 2.6.5, you must override all of the kafka dependencies to match

kafka-clients 2.6.1, kafka-streams 2.6.1 (if you are using them).

If you are using the embedded Kafka broker in tests, there are a bunch of other jars you need. See https://docs.spring.io/spring-kafka/docs/current/reference/html/#update-deps

2.6.x is used by Boot 2.4 and will bring in all the right versions.

Confluent 5.4 uses Kafka 2.4.

You should use the version of confluent that matches Spring Boot's prescribed versions of spring-kafka, kafka-clients.

If you use Boot 2.4.x, use confluent 6.0.

https://docs.confluent.io/platform/current/installation/versions-interoperability.html


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...