I am facing some errors during serialization of KStream
to KTable
when I introduced the groupBy
to my KStream
. As I understood, once you have an aggregate
or reduce
on a KStream
, Kafka tries to transform it to a KTable
due to necessary shuffle and because of this Kafka has to serialize the records again. So, my original KStream
was just mapping the records from JSON
to AVRO
like this, and it is working fine.
@StreamListener("notification-input-channel")
@SendTo("notification-output-avro-channel")
public KStream<String, NotificationAvro> process(KStream<String, PosInvoice> input) {
log.info("received PosInvoice JSON: {}", input);
KStream<String, NotificationAvro> notificationAvroKStream = input
.filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
.mapValues(v -> recordBuilder.getNotificationAvro(v));
notificationAvroKStream.foreach((k, v) -> log.info(String.format("Notification avro - key: %s, value: %s", k, v)));
return notificationAvroKStream;
}
then I introduced the groupByKey
and reduce
and I realized that it transforms to a KTable
and hence it needed Serdes
on the application.yaml
file. But unfortunately I cannot configure the default Serdes
because I have other types of serialization. Hence I decided to serialize on the KTable
topology. I am trying to implement this solution based on this answer.
The part of the code that I try to materialize with my custom serdes is not working properly (Materialized.with(CustomSerdes.String(), CustomSerdes.NotificationAvro())
). First, I don't think I need KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore");
, but without this it also does not work and I cannot find a materialized that is not KeyValueBytes...
where I can define my serdes CustomSerdes.String(), CustomSerdes.NotificationAvro()
.
According to the answer that I mentioned on the link, they also use a final StreamsBuilder builder = new StreamsBuilder();
. But since I am computing it using spring-kafka
I don't have this option, or if I have I don't know how to use.
@Service
@Slf4j
@EnableBinding(PosListenerJsonAvroBinding.class)
public class NotificationJsonAvroProcessorService {
@Autowired
RecordBuilder recordBuilder;
@StreamListener("notification-input-channel")
@SendTo("notification-output-avro-channel")
public KStream<String, NotificationAvro> process(KStream<String, PosInvoice> input) {
log.info("received PosInvoice JSON: {}", input);
KStream<String, NotificationAvro> notificationAvroKStream = input
.filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
.map((k, v) -> new KeyValue<>(v.getCustomerCardNo(), recordBuilder.getNotificationAvro(v)));
notificationAvroKStream.foreach((k, v) -> log.info(String.format("Notification avro - key: %s, value: %s", k, v)));
// *********************************************
// IS THERE A KeyValueStoreSupplier THAT I CAN PASS ALSO MY SERDES INSTEAD OF Bytes?
// KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore");
KTable<String, NotificationAvro> convertedTable = notificationAvroKStream
.toTable(
// *********************************************
// HOW TO MATERIALIZE KTABLE VALUES WITH SERDES ?
Materialized
// .as(storeSupplier) // this is not necessary
.with(CustomSerdes.String(), CustomSerdes.NotificationAvro())
// *********************************************
)
.groupBy((cardNo, notificationAvro) -> KeyValue.pair(cardNo, notificationAvro))
.reduce(
(aggValue, newValue) -> {
newValue.setTotalLoyaltyPoints(newValue.getEarnedLoyaltyPoints() + aggValue.getTotalLoyaltyPoints());
return newValue;
},
(aggValue, oldValue) -> oldValue
);
KStream<String, NotificationAvro> notificationAggAvroKStream = convertedTable.toStream();
notificationAggAvroKStream.foreach((k, v) -> log.info(String.format("Notification agg avro - key: %s, value: %s", k, v)));
return notificationAggAvroKStream;
}
}
the custom serdes:
@Service
public class CustomSerdes extends Serdes {
private static final String schema_registry_url = "http://localhost:8081";
private final static Map<String, String> serdeConfig = Collections
.singletonMap("schema.registry.url", schema_registry_url);
public static Serde<NotificationAvro> NotificationAvro() {
final Serde<NotificationAvro> notificationAvroSerde = new SpecificAvroSerde<>();
notificationAvroSerde.configure(serdeConfig, false);
return notificationAvroSerde;
}
}
and the error:
Exception in thread
"NotificationJsonAvroProcessorService-process-applicationId-3e262d96-19ca-438d-a2b8-9d3c2e9bb4ab-StreamThread-1"
org.apache.kafka.streams.errors.StreamsException: ClassCastException
while producing data to topic
NotificationJsonAvroProcessorService-process-applicationId-KTABLE-AGGREGATE-STATE-STORE-0000000010-repartition.
A serializer (key:
org.apache.kafka.common.serialization.StringSerializer / value:
org.apache.kafka.streams.kstream.internals.ChangedSerializer) is not
compatible to the actual key or value type (key type: java.lang.String
/ value type: org.apache.kafka.streams.kstream.internals.Change).
Change the default Serdes in StreamConfig or provide correct Serdes
via method parameters (for example if using the DSL, #to(String topic, Produced<K, V> produced)
with
Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))
).
...
...
Caused by: java.lang.ClassCastException: class
com.github.felipegutierrez.explore.spring.model.NotificationAvro
cannot be cast to class java.lang.String
(com.github.felipegutierrez.explore.spring.model.NotificationAvro is
in unnamed module of loader 'app'; java.lang.String is in module
java.base of loader 'bootstrap')
See Question&Answers more detail:
os