Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
376 views
in Technique[技术] by (71.8m points)

apache kafka - I am not getting results from ksql StreamQuery integrated with java. when I am printing log for client is showing not completed

I am using confluent kafka version 6.o downloaded from https://www.confluent.io/download/

I am referring

  1. https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-clients/java-client/ acritical .
  2. https://www.youtube.com/watch?v=85udigshlNI

with java producer code I am able to send value to ksql. But not able to retrieve this value. when I am printing log for streamQuery result, I am getting Not Completed message.

used Maven dependency as:

<dependencies>
        <dependency>
            <groupId>io.confluent.ksql</groupId>
            <artifactId>ksqldb-api-client</artifactId>
            <version>${ksqldb.version}</version>
        </dependency>
    </dependencies>

java code :

 public class ExampleApp {
    
      public static String KSQLDB_SERVER_HOST = "localhost";
      public static int KSQLDB_SERVER_HOST_PORT = 8088;
    
      public static void main(String[] args) {
        ClientOptions options = ClientOptions.create()
            .setHost(KSQLDB_SERVER_HOST)
            .setPort(KSQLDB_SERVER_HOST_PORT);
        Client client = Client.create(options);
    
        // Send requests with the client by following the other examples
    
        // Terminate any open connections and close the client
        client.close();
      }
    }public class ExampleApp {
    
      public static String KSQLDB_SERVER_HOST = "localhost";
      public static int KSQLDB_SERVER_HOST_PORT = 8088;
    
      public static void main(String[] args) {
        ClientOptions options = ClientOptions.create()
            .setHost(KSQLDB_SERVER_HOST)
            .setPort(KSQLDB_SERVER_HOST_PORT);
        Client client = Client.create(options);
    
        StreamedQueryResult streamedQueryResult = client.streamQuery("SELECT * FROM MY_STREAM EMIT CHANGES;").get();
    
    for (int i = 0; i < 10; i++) {
      // Block until a new row is available
      Row row = streamedQueryResult.poll();
      if (row != null) {
        System.out.println("Received a row!");
        System.out.println("Row: " + row.values());
      } else {
        System.out.println("Query has ended.");
      }
    }
        client.close();
      }
    }

output : get() is waiting for long time even after adding values into topic waiting and finally gives timeout exception.

question from:https://stackoverflow.com/questions/65950868/i-am-not-getting-results-from-ksql-streamquery-integrated-with-java-when-i-am-p

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...