【问题标题】:How can I instantiate a Mock Kafka Topic for junit tests?如何为 junit 测试实例化 Mock Kafka 主题?
【发布时间】:2016-02-18 07:34:41
【问题描述】:

我对使用 kafka 主题的代码进行了一些 JUnit 测试。我尝试过的模拟 kafka 主题不起作用,并且在线找到的示例非常旧,因此它们也不适用于 0.8.2.1。如何使用 0.8.2.1 创建模拟 kafka 主题?

澄清一下:我选择使用主题的实际嵌入式实例来测试真实实例,而不是在 mockito 中模拟手关。这样我就可以测试我的自定义编码器和解码器是否实际工作,并且当我使用真正的 kafka 实例时它不会失败。

【问题讨论】:

    标签: junit mocking apache-kafka


    【解决方案1】:

    您是否尝试过使用 Mockito 之类的模拟框架模拟 kafka 消费者对象?

    【讨论】:

    【解决方案2】:

    https://gist.github.com/asmaier/6465468#file-kafkaproducertest-java

    此示例已更新为在新的 0.8.2.2 版本中运行。这是带有maven依赖的代码sn-p:

    pom.xml:

    <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.12</version>
        </dependency>
        <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka_2.11</artifactId>
          <version>0.8.2.2</version>
        </dependency>
        <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka_2.11</artifactId>
          <version>0.8.2.2</version>
          <classifier>test</classifier>
        </dependency>
        <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>0.8.2.2</version>
        </dependency>
    </dependencies>
    

    KafkaProducerTest.java:

    import java.nio.charset.StandardCharsets;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import org.I0Itec.zkclient.ZkClient;
    import org.junit.Test;
    import kafka.admin.TopicCommand;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.producer.KeyedMessage;
    import kafka.producer.Producer;
    import kafka.producer.ProducerConfig;
    import kafka.server.KafkaConfig;
    import kafka.server.KafkaServer;
    import kafka.utils.MockTime;
    import kafka.utils.TestUtils;
    import kafka.utils.TestZKUtils;
    import kafka.utils.Time;
    import kafka.utils.ZKStringSerializer$;
    import kafka.zk.EmbeddedZookeeper;
    import static org.junit.Assert.*;
    
    /**
     * For online documentation
     * see
     * https://github.com/apache/kafka/blob/0.8.2/core/src/test/scala/unit/kafka/utils/TestUtils.scala
     * https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/admin/TopicCommand.scala
     * https://github.com/apache/kafka/blob/0.8.2/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
     */
    public class KafkaProducerTest {
    
        private int brokerId = 0;
        private String topic = "test";
    
        @Test
        public void producerTest() throws InterruptedException {
    
            // setup Zookeeper
            String zkConnect = TestZKUtils.zookeeperConnect();
            EmbeddedZookeeper zkServer = new EmbeddedZookeeper(zkConnect);
            ZkClient zkClient = new ZkClient(zkServer.connectString(), 30000, 30000, ZKStringSerializer$.MODULE$);
    
            // setup Broker
            int port = TestUtils.choosePort();
            Properties props = TestUtils.createBrokerConfig(brokerId, port, true);
    
            KafkaConfig config = new KafkaConfig(props);
            Time mock = new MockTime();
            KafkaServer kafkaServer = TestUtils.createServer(config, mock);
    
            String [] arguments = new String[]{"--topic", topic, "--partitions", "1","--replication-factor", "1"};
            // create topic
            TopicCommand.createTopic(zkClient, new TopicCommand.TopicCommandOptions(arguments));
    
            List<KafkaServer> servers = new ArrayList<KafkaServer>();
            servers.add(kafkaServer);
            TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 5000);
    
            // setup producer
            Properties properties = TestUtils.getProducerConfig("localhost:" + port);
            ProducerConfig producerConfig = new ProducerConfig(properties);
            Producer producer = new Producer(producerConfig);
    
            // setup simple consumer
            Properties consumerProperties = TestUtils.createConsumerProperties(zkServer.connectString(), "group0", "consumer0", -1);
            ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProperties));
    
            // send message
            KeyedMessage<Integer, byte[]> data = new KeyedMessage(topic, "test-message".getBytes(StandardCharsets.UTF_8));
    
            List<KeyedMessage> messages = new ArrayList<KeyedMessage>();
            messages.add(data);
    
            producer.send(scala.collection.JavaConversions.asScalaBuffer(messages));
            producer.close();
    
            // deleting zookeeper information to make sure the consumer starts from the beginning
            // see https://***.com/questions/14935755/how-to-get-data-from-old-offset-point-in-kafka
            zkClient.delete("/consumers/group0");
    
            // starting consumer
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(topic, 1);
            Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
            KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
            ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
    
            if(iterator.hasNext()) {
                String msg = new String(iterator.next().message(), StandardCharsets.UTF_8);
                System.out.println(msg);
                assertEquals("test-message", msg);
            } else {
                fail();
            }
    
            // cleanup
            consumer.shutdown();
            kafkaServer.shutdown();
            zkClient.close();
            zkServer.shutdown();
        }
    }
    

    请务必检查您的 mvn 依赖项:树是否存在任何冲突的库。我必须为 slf 和 log4j 添加排除项:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.8.2.2</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.8.2.2</version>
        <classifier>test</classifier>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.8.2.2</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    

    我正在研究的另一个选项是使用 apache curator: Is it possible to start a zookeeper server instance in process, say for unit tests?

    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-test</artifactId>
        <version>2.2.0-incubating</version>
        <scope>test</scope>
    </dependency>
    

    TestingServer zkTestServer;

    @Before
    public void startZookeeper() throws Exception {
        zkTestServer = new TestingServer(2181);
        cli = CuratorFrameworkFactory.newClient(zkTestServer.getConnectString(), new RetryOneTime(2000));
    }
    
    @After
    public void stopZookeeper() throws IOException {
        cli.close();
        zkTestServer.stop();
    }
    

    【讨论】:

    • 能否提供适用于 0.11.0.2 版本的代码。上面的代码不工作