These are my two kafka stream classes
MySource
MyProcessor
and Mysource class sends continues stream of data and retrieved in process method of Myprocessor class.
My requirements are
- When my each message is processed inside process method, I need to send response back to MySource class.(either SUCCESS/FAILED)
- When it unsuccessful like any exception thrown while invoking service call (newApplication.service(value);)
The process method should stop consume any messages further to prevent data loss.**
could you please help me on this.
MySource class
Kstreambuilder .build ().addSource (READ_FROM_TOPIC, Serdes.String.deserialzer (), Serdes.String.deserialzer (), messages).addProcessor (TRAN_PROCESSOR,()->new MyProcessor(),READ_FROM_TOPIC)
MyProcessor class
Public class MyProcessor implements Processor<String,String>{
Public void process (String key,String value){
Try{
**newApplication.service(value);**
} catch (exception e){
}
}
question from:
https://stackoverflow.com/questions/65641103/how-to-send-response-from-kafka-stream-processors-process-method 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…