Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
233 views
in Technique[技术] by (71.8m points)

java - Spring Kafka: Multiple Listeners for different objects within an ApplicationContext

Can I please check with the community what is the best way to listen to multiple topics, with each topic containing a message of a different class?

I've been playing around with Spring Kafka for the past couple of days. My thought process so far:

  • Because you need to pass your deserializer into DefaultKafkaConsumerFactory when initializing a KafkaListenerContainerFactory. This seems to indicate that if I need multiple containers each deserializing a message of a different type, I will not be able to use the @EnableKafka and @KafkaListener annotations.

  • This leads me to think that the only way to do so would be to instantiate multiple KafkaMessageListenerContainers.

  • And given that KafkaMessageListenerContainers is single threaded and I need to listen to multiple topics at the same time, I really should be using multiple ConcurrentKafkaMessageListenerContainers.

Would I be on the right track here? Is there a better way to do this?

Thanks!

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Here is a very simple example.

// -----------------------------------------------
// Sender
// -----------------------------------------------

@Configuration
public class SenderConfig {
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        ......
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, Class1> producerFactory1() {
        return new DefaultKafkaProducerFactory<String, Class1>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Class1> kafkaTemplate1() {
        return new KafkaTemplate<>(producerFactory1());
    }

    @Bean
    public Sender1 sender1() {
        return new Sender1();
    }

    //-------- send the second class --------

    @Bean
    public ProducerFactory<String, Class2> producerFactory2() {
        return new DefaultKafkaProducerFactory<String, Class2>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Class2> kafkaTemplate2() {
        return new KafkaTemplate<>(producerFactory2());
    }

    @Bean
    public Sender2 sender2() {
        return new Sender2();
    }
}

public class Sender1 {
    @Autowired
    private KafkaTemplate<String, Class1> kafkaTemplate1;

    public void send(String topic, Class1 c1) {
        kafkaTemplate1.send(topic, c1);
   }
}

public class Sender2 {
    @Autowired
    private KafkaTemplate<String, Class2> kafkaTemplate2;

    public void send(String topic, Class2 c2) {
        kafkaTemplate2.send(topic, c2);
    }
}

// -----------------------------------------------
// Receiver
// -----------------------------------------------

@Configuration
@EnableKafka
public class ReceiverConfig {

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        ......
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return props;
    }

    @Bean
    public ConsumerFactory<String, Class1> consumerFactory1() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
            new JsonDeserializer<>(Class1.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Class1> kafkaListenerContainerFactory1() {
        ConcurrentKafkaListenerContainerFactory<String, Class1> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory1());
        return factory;
    }

    @Bean
    public Receiver1 receiver1() {
        return new Receiver1();
    }

    //-------- add the second listener

    @Bean
    public ConsumerFactory<String, Class2> consumerFactory2() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
            new JsonDeserializer<>(Class2.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Class2> kafkaListenerContainerFactory2() {
        ConcurrentKafkaListenerContainerFactory<String, Class2> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory2());
        return factory;
    }

    @Bean
    public Receiver2 receiver2() {
        return new Receiver2();
    }
}

public class Receiver1 {
    @KafkaListener(id="listener1", topics = "topic1", containerFactory = "kafkaListenerContainerFactory1")
    public void receive(Class1 c1) {
        LOGGER.info("Received c1");
    }
}

public class Receiver2 {
    @KafkaListener(id="listener2", topics = "topic2", containerFactory = "kafkaListenerContainerFactory2")
    public void receive(Class2 c2) {
        LOGGER.info("Received c2");
    }
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...