Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
265 views
in Technique[技术] by (71.8m points)

java - NPE while doing context.forward() using low-level Kafka Stream API

I have built a plain Kafka streams API using Low-level Kafka API. The topology is linear.

p1 -> p2 -> p3

While doing context.forward(), I am getting NPE, snippet here:

NAjava.lang.NullPointerException: null
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:178)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)

...

I am using Kafka Stream 2.3.0.

I see a similar SO question [here][1], and the question is based on the very old version. So, not sure if this is the same error?

Edit

I am putting some more info, keeping the Gist of what I am doing

public class SP1Processor implements StreamProcessor {

private StreamProcessingContext ctxt;

// In init(), create a single thread pool
// which does some processing and sends the
// data to next processor
@Override
void init(StreamProcessingContext ctxt) {

      this.ctxt = ctxt;

     // Create a thread pool, do some work
     // and then do this.ctxt.forward(K,V)

    // Not showing code of Thread pool
    // Strangely, inside this thread pool,
    // this.ctxt isn't same what I see in process()
    // shouldn't it be same? ctxt is member variable
    // and shouldn't it be same
    // this.ctxt.forward(K,V) here in this thread pool is causing NPE
    // why does it happen?
    this.ctxt.forward(K,V);

}

@Override
void process(K,V) {

   // Here do some processing and go to the next processor chain
   // This works fine
   this.ctxt.forward(K,V);
}

}

  [1]: https://stackoverflow.com/questions/39067846/periodic-npe-in-kafka-streams-processor-context
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

It looks like it could be the same issue as the linked question, although we are talking about a much more contemporary version in your case. Make sure that ProcessorSupplier.get() returns a new instance each time it is called.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...