Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
2.1k views
in Technique[技术] by (71.8m points)

spring - Send two Serialized Java objects under one Kafka Topic

I want to implement Kafka Consumer and Producer which sends and receives Java Objects. Full Source I tried this:

Producer:

    @Configuration
public class KafkaProducerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, SaleRequestFactory> saleRequestFactoryProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SaleRequestFactorySerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new DefaultKafkaProducerFactory<>(configProps);
    }


    @Bean
    public KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate() {
        return new KafkaTemplate<>(saleRequestFactoryProducerFactory());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> replyKafkaTemplate(ProducerFactory<String, SaleRequestFactory> producerFactory, ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory) {
        ConcurrentMessageListenerContainer<String, SaleResponseFactory> kafkaMessageListenerContainer = factory.createContainer("tp-sale");
        kafkaMessageListenerContainer.getContainerProperties().setGroupId("tp-sale.reply");
        return new ReplyingKafkaTemplate<>(producerFactory, kafkaMessageListenerContainer);
    }
}

Send object:

 @RestController
@RequestMapping("/checkout")
public class CheckoutController {
    
    private TransactionService transactionService;
    private KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate;
    private ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> requestReplyKafkaTemplate;
    private static String topic = "tp-sale";

    @Autowired
    public CheckoutController(ValidationMessage validationMessage, TransactionService transactionService,
                              KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate,
                              ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> requestReplyKafkaTemplate){
        this.transactionService = transactionService;
        this.saleRequestFactoryKafkaTemplate = saleRequestFactoryKafkaTemplate;
        this.requestReplyKafkaTemplate = requestReplyKafkaTemplate;
    }

    @PostMapping("test")
    private void performPayment() throws ExecutionException, InterruptedException, TimeoutException {

        Transaction transaction = new Transaction();
        transaction.setStatus(PaymentTransactionStatus.IN_PROGRESS.getText());

        Transaction insertedTransaction = transactionService.save(transaction);

        SaleRequestFactory obj = new SaleRequestFactory();
        obj.setId(100);

        ProducerRecord<String, SaleRequestFactory> record = new ProducerRecord<>("tp-sale", obj);
        RequestReplyFuture<String, SaleRequestFactory, SaleResponseFactory> replyFuture = requestReplyKafkaTemplate.sendAndReceive(record);
        SendResult<String, SaleRequestFactory> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
        ConsumerRecord<String, SaleResponseFactory> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);


        SaleResponseFactory value = consumerRecord.value();
        System.out.println("!!!!!!!!!!!! " + value.getUnique_id());
    }
}

Consumer:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    private String groupId = "test";

    @Bean
    public ConsumerFactory<String, SaleResponseFactory> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SaleResponseFactoryDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

Receive Object

@Component
public class ProcessingSaleListener {

    private static String topic = "tp-sale";

    @KafkaListener(topics = "tp-sale")
    public SaleResponseFactory process(@Payload SaleRequestFactory tf, @Headers MessageHeaders headers) throws Exception {

        System.out.println(tf.getId());

        SaleResponseFactory resObj = new SaleResponseFactory();
        resObj.setUnique_id("123123");

        return resObj;
    }
}

Custom objects

import java.io.Serializable;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
public class SaleRequestFactory implements Serializable {

    private static final long serialVersionUID = 1744050117179344127L;
    
    private int id;

}

Serializer

import org.apache.kafka.common.serialization.Serializer;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;

public class SaleRequestFactorySerializer implements Serializable, Serializer<SaleRequestFactory> {

    @Override
    public byte[] serialize(String topic, SaleRequestFactory data)
    {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        try
        {
            ObjectOutputStream outputStream = new ObjectOutputStream(out);
            outputStream.writeObject(data);
            out.close();
        }
        catch (IOException e)
        {
            throw new RuntimeException("Unhandled", e);
        }
        return out.toByteArray();
    }
}

Response Object

import java.io.Serializable;
import java.time.LocalDateTime;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
public class SaleResponseFactory implements Serializable {

    private static final long serialVersionUID = 1744050117179344127L;

    private String unique_id;
}

Response Class

import org.apache.kafka.common.serialization.Deserializer;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;

public class SaleResponseFactoryDeserializer implements Serializable, Deserializer<SaleRequestFactory> {

    @Override
    public SaleRequestFactory deserialize(String topic, byte[] data)
    {
        SaleRequestFactory saleRequestFactory = null;
        try
        {
            ByteArrayInputStream bis = new ByteArrayInputStream(data);
            ObjectInputStream in = new ObjectInputStream(bis);
            saleRequestFactory = (SaleRequestFactory) in.readObject();
            in.close();
        }
        catch (IOException | ClassNotFoundException e)
        {
            throw new RuntimeException("Unhandled", e);
        }
        return saleRequestFactory;
    }
}

I want to send and receive different Serialized Java Object based on the Object type. For example sometimes SaleRequestFactory and to receive SaleResponseFactory or to send AuthRequestFactory and receive AuthResponseFactory. Is it possible to send and receive diffrent Java Obects using one topic?

Full example code

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Use Object as the value type - here is an example using Boot's auto-configured infrastructure beans...

@SpringBootApplication
public class So65866763Application {

    public static void main(String[] args) {
        SpringApplication.run(So65866763Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, Object> template) {
        return args -> {
            template.send("so65866763", new Foo());
            template.send("so65866763", new Bar());
        };
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so65866763").partitions(1).replicas(1).build();
    }

}

class Foo implements Serializable {

}

class Bar implements Serializable {

}

@Component
@KafkaListener(id = "so65866763", topics = "so65866763")
class Listener {

    @KafkaHandler
    void fooListener(Foo foo) {
        System.out.println("In fooListener: " + foo);
    }

    @KafkaHandler
    void barListener(Bar bar) {
        System.out.println("In barListener: " + bar);
    }

}
public class JavaSerializer implements Serializer<Object> {

    @Override
    public byte[] serialize(String topic, Object data) {
        return null;
    }

    @Override
    public byte[] serialize(String topic, Headers headers, Object data) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
            oos.writeObject(data);
            return baos.toByteArray();
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

}
public class JavaDeserializer implements Deserializer<Object> {

    @Override
    public Object deserialize(String topic, byte[] data) {
        return null;
    }

    @Override
    public Object deserialize(String topic, Headers headers, byte[] data) {
        ByteArrayInputStream bais = new ByteArrayInputStream(data);
        try (ObjectInputStream ois = new ObjectInputStream(bais)) {
            return ois.readObject();
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        catch (ClassNotFoundException e) {
            throw new IllegalStateException(e);
        }
    }

}
spring.kafka.consumer.auto-offset-reset=earliest

spring.kafka.producer.value-serializer=com.example.demo.JavaSerializer
spring.kafka.consumer.value-deserializer=com.example.demo.JavaDeserializer
In fooListener: com.example.demo.Foo@331ca660
In barListener: com.example.demo.Bar@26f54288

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...