【问题标题】:How to read all the records in a Kafka topic如何读取 Kafka 主题中的所有记录
【发布时间】:2019-07-04 12:12:15
【问题描述】:

我正在使用 kafka : kafka_2.12-2.1.0, spring kafka 在客户端并且遇到了一个问题。

我需要通过读取 kafka 主题中的所有现有消息来加载内存映射。我通过启动一个新的消费者(具有唯一的消费者组 ID 并将偏移量设置为 earliest)来做到这一点。然后我遍历消费者(轮询方法)以获取所有消息并在消费者记录为空时停止。

但我注意到,当我开始轮询时,前几次迭代将消费者记录返回为空,然后它开始返回实际记录。现在这打破了我的逻辑,因为我们的代码认为主题中没有记录。

我尝试了其他几种方法(例如使用偏移量),但无法提出任何解决方案,除了在某处保存另一条记录,告诉我主题中有多少条消息需要阅读在我停下来之前。

有什么想法吗?

【问题讨论】:

    标签: apache-kafka kafka-consumer-api spring-kafka


    【解决方案1】:

    除了来自@arshad 的上述答案之外,您没有获得记录的原因是因为您已经阅读了它们。在此处查看此答案using earliest or latest does not matter on the consumer after you have a committed offset for the partition

    如果您知道起始偏移量,我会使用寻找到开头或特定偏移量的方法。

    【讨论】:

      【解决方案2】:

      您必须使用 2 个消费者,一个来加载偏移量,另一个来读取所有记录。

      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.common.PartitionInfo;
      import org.apache.kafka.common.TopicPartition;
      import org.apache.kafka.common.serialization.ByteArrayDeserializer;
      
      import java.time.Duration;
      import java.util.ArrayList;
      import java.util.Arrays;
      import java.util.HashMap;
      import java.util.List;
      import java.util.Map;
      import java.util.Objects;
      import java.util.Properties;
      import java.util.Set;
      import java.util.stream.Collectors;
      
      public class KafkaRecordReader {
      
          static final Map<String, Object> props = new HashMap<>();
          static {
              props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
              props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
              props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
              props.put(ConsumerConfig.CLIENT_ID_CONFIG, "sample-client");
          }
      
          public static void main(String[] args) {
              final Map<TopicPartition, OffsetInfo> partitionOffsetInfos = getOffsets(Arrays.asList("world, sample"));
              final List<ConsumerRecord<byte[], byte[]>> records = readRecords(partitionOffsetInfos);
      
              System.out.println(partitionOffsetInfos);
              System.out.println("Read : " + records.size() + " records");
          }
      
          private static List<ConsumerRecord<byte[], byte[]>> readRecords(final Map<TopicPartition, OffsetInfo> offsetInfos) {
              final Properties readerProps = new Properties();
              readerProps.putAll(props);
              readerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "record-reader");
      
              final Map<TopicPartition, Boolean> partitionToReadStatusMap = new HashMap<>();
              offsetInfos.forEach((tp, offsetInfo) -> {
                  partitionToReadStatusMap.put(tp, offsetInfo.beginOffset == offsetInfo.endOffset);
              });
      
              final List<ConsumerRecord<byte[], byte[]>> cachedRecords = new ArrayList<>();
              try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(readerProps)) {
                  consumer.assign(offsetInfos.keySet());
                  for (final Map.Entry<TopicPartition, OffsetInfo> entry : offsetInfos.entrySet()) {
                      consumer.seek(entry.getKey(), entry.getValue().beginOffset);
                  }
      
                  boolean close = false;
                  while (!close) {
                      final ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(100));
                      for (final ConsumerRecord<byte[], byte[]> record : consumerRecords) {
                          cachedRecords.add(record);
                          final TopicPartition currentTp = new TopicPartition(record.topic(), record.partition());
                          if (record.offset() + 1 == offsetInfos.get(currentTp).endOffset) {
                              partitionToReadStatusMap.put(currentTp, true);
                          }
                      }
      
                      boolean done = true;
                      for (final Map.Entry<TopicPartition, Boolean> entry : partitionToReadStatusMap.entrySet()) {
                          done &= entry.getValue();
                      }
                      close = done;
                  }
              }
              return cachedRecords;
          }
      
          private static Map<TopicPartition, OffsetInfo> getOffsets(final List<String> topics) {
              final Properties offsetReaderProps = new Properties();
              offsetReaderProps.putAll(props);
              offsetReaderProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "offset-reader");
      
              final Map<TopicPartition, OffsetInfo> partitionOffsetInfo = new HashMap<>();
              try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(offsetReaderProps)) {
                  final List<PartitionInfo> partitionInfos = new ArrayList<>();
                  topics.forEach(topic -> partitionInfos.addAll(consumer.partitionsFor("sample")));
                  final Set<TopicPartition> topicPartitions = partitionInfos
                          .stream()
                          .map(x -> new TopicPartition(x.topic(), x.partition()))
                          .collect(Collectors.toSet());
                  consumer.assign(topicPartitions);
                  final Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(topicPartitions);
                  final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
      
                  for (final TopicPartition tp : topicPartitions) {
                      partitionOffsetInfo.put(tp, new OffsetInfo(beginningOffsets.get(tp), endOffsets.get(tp)));
                  }
              }
              return partitionOffsetInfo;
          }
      
          private static class OffsetInfo {
      
              private final long beginOffset;
              private final long endOffset;
      
              private OffsetInfo(long beginOffset, long endOffset) {
                  this.beginOffset = beginOffset;
                  this.endOffset = endOffset;
              }
      
              @Override
              public String toString() {
                  return "OffsetInfo{" +
                          "beginOffset=" + beginOffset +
                          ", endOffset=" + endOffset +
                          '}';
              }
      
              @Override
              public boolean equals(Object o) {
                  if (this == o) return true;
                  if (o == null || getClass() != o.getClass()) return false;
                  OffsetInfo that = (OffsetInfo) o;
                  return beginOffset == that.beginOffset &&
                          endOffset == that.endOffset;
              }
      
              @Override
              public int hashCode() {
                  return Objects.hash(beginOffset, endOffset);
              }
          }
      }
      

      【讨论】:

      • 您可能需要编辑if (record.offset() + 1 == offsetInfos.get(currentTp).endOffset)这一行——如果您使用的是事务性生产者,因为它的偏移量不是线性的。
      【解决方案3】:

      据我了解,您想要实现的是根据特定主题中已有的值在您的应用程序中构建地图。

      对于此任务,您可以在 Kafka Streams DSL 中使用Ktable,而不是手动轮询主题,这将自动构建一个可读的键值存储,该存储具有容错、复制功能并自动填充新值。

      您可以简单地通过在流上调用 groupByKey 然后使用聚合来做到这一点。

      KStreamBuilder builder = new KStreamBuilder();
      KStream<String, Long> myKStream = builder.stream(Serdes.String(), Serdes.Long(), "topic_name");
      KTable<String, Long> totalCount = myKStream.groupByKey().aggregate(this::initializer, this::aggregator);
      

      (实际代码可能会因 kafka 版本、您的配置等而异。)

      阅读更多关于 Kafka Stream 概念的信息here

      然后我遍历消费者(轮询方法)以获取所有消息并在消费者记录为空时停止

      Kafka 是一个消息流平台。您流式传输的任何数据都在不断更新,您可能不应该以您期望消耗在一定数量的消息后停止的方式使用它。停止消费者后,如果有新消息进来,你将如何处理?

      另外你得到空记录的原因可能与记录在不同的分区等有关。

      您在这里的具体用例是什么?,可能有一种使用 Kafka 语义本身的好方法。

      【讨论】:

      • 用例只是在启动时在 kafka 中创建现有消息的内存映射。一旦它被创建,这个消费者就没有用了。还有其他消费者正在收听 kakfa 的实时事件,但在地图填充之前他们无法启动。这些消费者使用 kafka 流,但这个特定的消费者只需要阅读其中存在的内容并停止发布。
      • 是否总是前几次迭代返回空记录?您是否尝试为轮询设置更长的超时时间?您可以使用消费者代码的 sn-p 编辑原始问题吗?
      • 是的,它的前 2-3 次迭代。你是对的,如果我们将轮询时间保持在 1-2 秒,我们就不会找到这些空记录。但是我们又不能想出任何逻辑来决定时间,因为它可能在不同的环境中有所不同。
      • 你现在的超时时间是多少?,如果没有看到任何代码,就不可能提出任何建议。
      猜你喜欢
      • 2021-09-03
      • 1970-01-01
      • 2021-03-01
      • 1970-01-01
      • 2018-01-17
      • 2017-04-11
      • 1970-01-01
      • 2023-03-19
      • 2015-07-03
      相关资源
      最近更新 更多