【问题标题】:Kafka – check number of messages in each partitionKafka – 检查每个分区中的消息数
【发布时间】:2016-10-17 20:11:39
【问题描述】:

我已经实现了一个循环分区器,如下所示:

public class KafkaRoundRobinPartitioner implements Partitioner {

    private static final Logger log = Logger.getLogger(KafkaRoundRobinPartitioner.class);

    final AtomicInteger counter = new AtomicInteger(0);

    public KafkaRoundRobinPartitioner() {}

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int partitionsCount = partitions.size();

        int partitionId = counter.incrementAndGet() % partitionsCount;
        if (counter.get() > 65536) {
            counter.set(partitionId);
        }
        return partitionId;
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> map) {
    }
}

现在我想测试每个分区是否有相同数量的消息。例如,如果我有 1 个具有 32 个分区的主题,并且我向该主题发送 32 条消息,我希望每个分区恰好有 1 条消息。

我想做如下的事情:

KafkaPartitions allPartitions = new KafkaTopic("topic_name");
for (KafkaPartition partition : allPartitions) {
    int msgCount = partition.getMessagesCount();
    // do asserts
}

据我所知,Kafka Java API 没有为我们提供这样的功能,但我可能在文档中丢失了一些东西。

有什么方法可以优雅的实现吗?

更新 我找到了一个基本的解决方案。由于我使用的是多消费者模型,因此我可以为每个消费者执行以下操作:

consumer.assignment().size();

之后我可以做:

consumer.poll(100);

并检查每个消费者是否有消息。在这种情况下,我不应该遇到一个消费者从其分区中为另一个消费者获取消息的情况,因为由于我拥有相同数量的消费者和分区,Kafka 应该以循环方式在消费者之间分配分区。

【问题讨论】:

    标签: java apache-kafka integration-testing kafka-consumer-api


    【解决方案1】:

    您可以使用seekToBeginning()seekToEnd() 并计算您为每个分区获得的偏移量的差异。

    【讨论】:

      【解决方案2】:

      最后,我写了如下内容。

      我的 KafkaConsumer 的工人有以下代码:

      public void run() {
          while (keepProcessing) {
              try {
                  ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
                  for (ConsumerRecord<byte[], byte[]> record : records) {
                      // do processing
                      consumer.commitSync();
                  }
              } catch (Exception e) {
                  logger.error("Couldn't process message", e);
              }
          }
      }
      

      在我的测试中,我决定检查 每个 消费者是否准确地进行了一次提交,这意味着消息分发是以循环方式进行的。测试代码:

      public class KafkaIntegrationTest {
      
      private int consumersAndPartitionsNumber;
      private final CountDownLatch latch = new CountDownLatch(consumersAndPartitionsNumber);
      
      @Test
      public void testPartitions() throws Exception {
          consumersAndPartitionsNumber = Config.getConsumerThreadAmount(); // it's 5
          KafkaMessageQueue kafkaMessageQueue = new KafkaMessageQueue(); // just a class with Producer configuration
          String groupId = Config.getGroupId();
          List<KafkaConsumer<byte[], byte[]>> consumers = new ArrayList<>(consumersAndPartitionsNumber);
      
          for (int i = 0; i < consumersAndPartitionsNumber; i++) {
              consumers.add(spy(new KafkaConsumer<>(KafkaManager.createKafkaConsumerConfig(groupId))));
          }
      
          ExecutorService executor = Executors.newFixedThreadPool(consumersAndPartitionsNumber);
          for (KafkaConsumer<byte[], byte[]> consumer : consumers) {
              executor.submit(new TestKafkaWorker(consumer));
          }
      
          for (int i = 0; i < consumersAndPartitionsNumber; i++) {
              // send messages to topic
              kafkaMessageQueue.send(new PostMessage("pageid", "channel", "token", "POST", null, "{}"));
          }
      
          latch.await(60, TimeUnit.SECONDS);
      
          for (KafkaConsumer<byte[], byte[]> consumer : consumers) {
              verify(consumer).commitSync();
          }
      }
      
      class TestKafkaWorker implements Runnable {
      
          private final KafkaConsumer<byte[], byte[]> consumer;
          private boolean keepProcessing = true;
      
          TestKafkaWorker(KafkaConsumer<byte[], byte[]> consumer) {
              this.consumer = consumer;
              consumer.subscribe(Arrays.asList(Config.getTaskProcessingTopic()));
          }
      
          public void run() {
              while (keepProcessing) {
                  try {
                      ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
                      for (ConsumerRecord<byte[], byte[]> record : records) {
                          consumer.commitSync();
                          keepProcessing = false;
                          latch.countDown();
                      }
                  } catch (Exception e) {
                  }
              }
          }
      }
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2017-04-29
        • 1970-01-01
        • 2018-08-22
        • 2018-11-26
        • 1970-01-01
        • 2019-04-24
        • 1970-01-01
        相关资源
        最近更新 更多