【问题标题】:Test Kafka Streams topology测试 Kafka Streams 拓扑
【发布时间】:2017-06-09 02:35:48
【问题描述】:

我正在寻找一种方法来测试 Kafka Streams 应用程序。这样我就可以定义输入事件,测试套件会向我显示输出。

如果没有真正的 Kafka 设置,这可能吗?

【问题讨论】:

    标签: testing apache-kafka apache-kafka-streams


    【解决方案1】:

    更新 Kafka 1.1.0(2018 年 3 月 23 日发布):

    KIP-247 添加了官方测试工具。根据Upgrade Guide

    有一个新的工件kafka-streams-test-utils 提供TopologyTestDriverConsumerRecordFactoryOutputVerifier 类。您可以将新工件作为常规依赖项包含到您的单元测试中,并使用测试驱动程序来测试您的 Kafka Streams 应用程序的业务逻辑。详情请见KIP-247

    来自documentation

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams-test-utils</artifactId>
            <version>1.1.0</version>
            <scope>test</scope>
        </dependency>
    

    测试驱动程序模拟库运行时不断从输入主题中获取记录并通过遍历拓扑来处理它们。您可以使用测试驱动程序来验证您指定的处理器拓扑是否使用手动输入的数据记录计算出正确的结果。测试驱动程序捕获结果记录并允许查询其嵌入式状态存储:

        // Create your topology
        Topology topology = new Topology();
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
    
        // Run it on the test driver
        TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);
    
        // Feed input data
        ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>("input-topic", new StringSerializer(), new IntegerSerializer());
        testDriver.pipe(factory.create("key", 42L));
    
        // Verify output
        ProducerRecord<String, Integer> outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new LongDeserializer());
    

    详情请见the documentation


    ProcessorTopologyTestDriver 从 0.11.0.0 开始可用。它在kafka-streams 测试工件中可用(在Maven 中用&lt;classifier&gt;test&lt;/classifier&gt; 指定):

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>0.11.0.0</version>
            <classifier>test</classifier>
            <scope>test</scope>
        </dependency>
    

    您还需要添加kafka-clients 测试工件:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
            <classifier>test</classifier>
            <scope>test</scope>
        </dependency>
    

    然后您可以使用测试驱动程序。根据 Javadoc,首先创建一个 ProcessorTopologyTestDriver:

        StringSerializer strSerializer = new StringSerializer();
        StringDeserializer strDeserializer = new StringDeserializer();
        Properties props = new Properties();
        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
        props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
        props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
        props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
        props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
        props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
        StreamsConfig config = new StreamsConfig(props);
        TopologyBuilder builder = ...
        ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, builder);
    

    您可以将输入输入到拓扑中,就好像您实际上已经写入了输入主题之一:

        driver.process("input-topic", "key1", "value1", strSerializer, strSerializer);
    

    并阅读输出主题:

        ProducerRecord<String, String> record1 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
        ProducerRecord<String, String> record2 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
        ProducerRecord<String, String> record3 = driver.readOutput("output-topic-2", strDeserializer, strDeserializer);
    

    然后你可以断言这些结果。

    【讨论】:

    • 在我的情况下,record1record2 是与主题的第一个对象相同的对象。我不知道我怎么能得到第二个
    • 可能由任何原因引起。这是您通过这些测试寻找的那种东西:)
    • @DmitryMinkovsky:只是一个查询。我们如何测试 1.0.0 版的 Kafka Streams 应用程序?我不认为kafka-streams-test-utils 支持 1.0.0 版本。
    【解决方案2】:
    1. 当您询问是否可以在没有真正 Kafka 设置的情况下测试 Kafka Streams 应用程序时,您可以尝试使用 Scala 中的这个 Mocked Streams 库。 Mocked Streams 1.0 是一个适用于 Scala >= 2.11.8 的库,它允许您在没有 Zookeeper 和 Kafka Brokers 的情况下对 Kafka Streams 应用程序(因为 Apache Kafka >=0.10.1)的处理拓扑进行单元测试。 参考:https://github.com/jpzk/mockedstreams

    2. 您还可以使用 scalatest-embedded-kafka,它是一个库,可提供内存中的 Kafka 代理来运行您的 ScalaTest 规范。它使用 Kafka 0.10.1.1 和 ZooKeeper 3.4.8。
      参考:https://github.com/manub/scalatest-embedded-kafka#scalatest-embedded-kafka-streams

    祝你好运!

    【讨论】:

      【解决方案3】:

      Spring kafka 支持使用嵌入式 kafka 进行单元测试,请参阅https://docs.spring.io/spring-kafka/docs/2.1.0.RELEASE/reference/html/_reference.html#__embeddedkafka_annotation

      此外,kafka 团队正在努力为流 https://issues.apache.org/jira/browse/KAFKA-3625 发布测试驱动程序。

      【讨论】:

        【解决方案4】:

        您可以只在本地运行一个 Zookeeper 和代理来测试 Kafka Streams 应用程序。

        只需遵循这些快速入门指南:

        还可以查看这个 Kafka Streams 示例(在 JavaDocs 中有详细的演练说明):

        【讨论】:

        • 是的,这就是我现在要做的
        • 我明白了。你想做单元测试——你的问题并不清楚……有些人使用 Kafka Streams 的内部测试类——它们不是正式发布的一部分,但你可以从github.com/apache/kafka/tree/trunk/streams/src/test/java/org/… 获得。有关使用模式,请参阅 Kafka Streams 的集成测试。唯一的缺点是,这些类可能会在没有通知的情况下更改(因为它们是内部的)。因此,如果您稍后从 github 更新这些类,您的测试设置可能会中断。但只要你坚持一个版本,if 应该可以正常工作。
        【解决方案5】:

        你应该检查卡夫卡单元here

        您的测试设置应如下所示:

        KafkaUnit kafkaUnitServer = new KafkaUnit();
        kafkaUnitServer.startup();
        kafkaUnitServer.createTopic(testTopic);
        KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(testTopic, "key", "value");
        kafkaUnitServer.sendMessages(keyedMessage);
        

        然后要阅读您的消息并断言一切正常,您可以执行以下操作:

        List<String> messages = kafkaUnitServer.readMessages(testTopic, 1);
        

        这实际上启动了一个嵌入式 kafka,它可以帮助您将所需的一切都包含在测试中。

        您可以稍微花哨一点,将嵌入式 kafka 设置为 setup() 方法(或 Spock 中的 setupSpec()),然后在 teardown() 中停止嵌入式 kafka。

        【讨论】:

          【解决方案6】:

          您可以使用https://github.com/jpzk/mockedstreams 参见下面的示例...

          import com.madewithtea.mockedstreams.MockedStreams
          
          val input = Seq(("x", "v1"), ("y", "v2"))
          val exp = Seq(("x", "V1"), ("y", "V2"))
          val strings = Serdes.String()
          
          MockedStreams()
            .topology { builder => builder.stream(...) [...] }
            .input("topic-in", strings, strings, input)
            .output("topic-out", strings, strings, exp.size) shouldEqual exp
          

          希望对你有帮助...

          【讨论】:

            【解决方案7】:

            如果要测试使用Processor APIKafka Stream 拓扑,Dmitry 提供的代码可能无法正常工作。因此,在对Javadocsofficial docs 进行了几个小时的研究后,我得出了一个工作代码,以测试您使用JUnit 实现的自定义处理器。

            public class TopologySpec {
            
            private TopologyTestDriver testDriver;
            
            @Before
            public void setup() {
                // Processor API
                Topology topology = new Topology();
                topology.addSource("sourceProcessor", "input-topic");
                // In this case, 'EventProcessor' is a custom processor
                // that I implemented and I want to test
                topology.addProcessor("processor", EventProcessor::new, "sourceProcessor");
                topology.addSink("sinkProcessor", "output-topic", "processor");
            
                // Setup test driver
                Properties config = new Properties();
                config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
                config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
                // EventProcessor is a <String,String> processor 
                // so we set those serders
                config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                testDriver = new TopologyTestDriver(topology, config);
            }
            
            @After
            public void tearDown() {
                testDriver.close(); // Close processors after finish the tests
            }
            
            @Test
            public void firstTest() {
                // Simulate a producer that sends the message "value,val" without key
                ConsumerRecordFactory factory =
                        new ConsumerRecordFactory(new StringSerializer(), new StringSerializer());
            
                testDriver.pipeInput(factory.create("input-topic", "value,val"));
            
                // Simulate a consumer that reads from the output topic 
                // where are supposed to be the messages after being processed
                // by your custom processor
                ProducerRecord<String, String> record1 =
                        testDriver.readOutput("output-topic", new StringDeserializer(), new StringDeserializer());
            
                // Compare the output to ensure that your custom processor
                // is working properly. In this case, my processor consumes
                // the message, concatenates ":::processed" to it, and
                // push it to the output-topic
                OutputVerifier.compareValue(record1, "value,val:::processed");
            }
            }
            

            【讨论】:

            • 为什么不呢? @米哈伊尔
            • 你使用哪个操作系统?
            • Windows,但我也在 Linux 上进行了测试,它工作正常。
            猜你喜欢
            • 2019-10-23
            • 1970-01-01
            • 1970-01-01
            • 2020-08-28
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2021-09-28
            • 1970-01-01
            相关资源
            最近更新 更多