Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
679 views
in Technique[技术] by (71.8m points)

testing - Test Kafka Streams topology

I'm searching a way to test a Kafka Streams application. So that I can define the input events and the test suite shows me the output.

Is this possible without a real Kafka setup?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Update Kafka 1.1.0 (released 23-Mar-2018):

KIP-247 added official test utils. Per the Upgrade Guide:

There is a new artifact kafka-streams-test-utils providing a TopologyTestDriver, ConsumerRecordFactory, and OutputVerifier class. You can include the new artifact as a regular dependency to your unit tests and use the test driver to test your business logic of your Kafka Streams application. For more details, see KIP-247.

From the documentation:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams-test-utils</artifactId>
        <version>1.1.0</version>
        <scope>test</scope>
    </dependency>

The test driver simulates the library runtime that continuously fetches records from input topics and processes them by traversing the topology. You can use the test driver to verify that your specified processor topology computes the correct result with the manually piped in data records. The test driver captures the results records and allows to query its embedded state stores:

    // Create your topology
    Topology topology = new Topology();
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");

    // Run it on the test driver
    TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);

    // Feed input data
    ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>("input-topic", new StringSerializer(), new IntegerSerializer());
    testDriver.pipe(factory.create("key", 42L));

    // Verify output
    ProducerRecord<String, Integer> outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new LongDeserializer());

See the documentation for details.


ProcessorTopologyTestDriver is available as of 0.11.0.0. It is available in the kafka-streams test artifact (specified with <classifier>test</classifier> in Maven):

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>0.11.0.0</version>
        <classifier>test</classifier>
        <scope>test</scope>
    </dependency>

You will also need to add the kafka-clients test artifact:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.11.0.0</version>
        <classifier>test</classifier>
        <scope>test</scope>
    </dependency>

Then you can use the test driver. Per the Javadoc, first create a ProcessorTopologyTestDriver:

    StringSerializer strSerializer = new StringSerializer();
    StringDeserializer strDeserializer = new StringDeserializer();
    Properties props = new Properties();
    props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
    props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
    props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
    props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
    props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
    props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
    StreamsConfig config = new StreamsConfig(props);
    TopologyBuilder builder = ...
    ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, builder);

You can feed input into the topology as though you had actually written to one of the input topics:

    driver.process("input-topic", "key1", "value1", strSerializer, strSerializer);

And read output topics:

    ProducerRecord<String, String> record1 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
    ProducerRecord<String, String> record2 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
    ProducerRecord<String, String> record3 = driver.readOutput("output-topic-2", strDeserializer, strDeserializer);

Then you can assert on these results.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...