【问题标题】:@KafkaListener Read from beginning every time@KafkaListener 每次都从头开始阅读
【发布时间】:2019-05-02 11:36:18
【问题描述】:

我正在使用下面的示例来使用 spring Kafka 消费者来读取消息。我的用例要求每次生成消息时,侦听器每次都从头开始读取。

@KafkaListener(
    id = "grouplistener",
    topicPartitions = { 
        @TopicPartition(
            topic = "mycompactedtopic", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0")
        )
    }
)

public void onReceiving(
    String payload, @Header(KafkaHeaders.OFFSET) Integer offset,
    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
    @Header(KafkaHeaders.RECEIVED_TOPIC) String topic
) {
    log.info(
        "Processing topic = {}, partition = {}, offset = {}, payload= {}",
        topic, partition, offset, payload
    );
}

我似乎只能在应用程序启动时从头开始读取它,然后它通常只会消耗消息。

有没有办法让它每次都开始寻找?

【问题讨论】:

  • 每次发送消息,是否需要重新消费旧消息?
  • 这样做的商业目的是什么? Apache Kafka 的要点是它不会删除消息,因此您每次都会从主题中获取所有记录。消费者方面也没有办法知道生产已经向主题发送了一条记录。这不是消息中间件的用途......
  • @cricket_007 是的,完全正确。这个主题具体不会包含几百万条记录,它是一个配置主题,因此每次消费时我都需要阅读整个主题。
  • 目的是,我很欣赏它不是 Kafka 的设计目的,而是我被要求实现的是使用具有 1 个分区的压缩主题来保存配置列表。然后这需要由一个休息端点调用,它应该显示一个完整的唯一配置列表,唯一的部分很好,一个 java 集合排序,我遇到的问题是当我用普通消费者实现它时,它非常慢,因为它必须初始化消费者并重置偏移量。使用@Kafka 监听器,它只显示最新消息。

标签: spring apache-kafka spring-kafka


【解决方案1】:

使用带有 1 个分区的压缩主题来保存配置列表。然后需要由一个休息端点调用它,它应该显示一个完整的唯一配置列表

您应该实现这一点的方式是使用 Kafka Streams 和 KTable 并在您的 REST 层后面设置 interactive queries。不是一个标准的消费者,需要自己倒带以获得系统的最新状态。

Kafka Connect 框架中已经存在一个这样的例子,它有一个配置主题,你只能访问GET /connectors/name/config 的最新值,并且只有当你重新启动它或扩展到更多实例时,它才会再次消费所有消息。 Schema Registry 也是一个例子,它存储了 _schemas 主题中所有模式的内部 Hashmap,并具有用于读取、插入、删除的 REST API

本质上,当您获得给定键的新配置时,您可以用一个全新的键“替换”给定键的旧值,或者您可以将旧值与新数据“合并”,在某种方式。

【讨论】:

  • 感谢您的回复。我已经排除了 Kafka 流,对于我最多需要 100-200 条记录的开销来说,开销太高了。我已经查看了它是如何在模式注册表中完成的,它看起来像是将主题复制到内存缓存中并在添加新消息时对其进行更新。我想我只会使用这种方法,我会将它们复制到 mem db 中的 h2 中,然后我可以使用标准 spring crudrepository 与之交互。
  • 不确定您所说的“开销”是什么意思。本地存储在内存中的一个问题是,如果您扩展应用程序,每个实例只能获取部分数据。模式注册表只有一个分区主题,并且只有一个接受请求的合格主节点,因此可以避免该问题。如果你真的想复制它,那就是真正的开销来自哪里。如果您在谈论代码,我相信有一个 Spring Kafka Streams 包装器
【解决方案2】:

我将如何实现它。您需要实现ConsumerSeekAware 接口并对onPartitionsAssigned 方法进行一些实现。如果您在重新启动应用程序时发送环境变量,您也可以按需进行 seekToBegining。不过我还没有实现!

@Service
@EnableKafka
public class Service implements ConsumerSeekAware {



    @KafkaListener(topics = "${topicName}", groupId = "${groupId}")
    public void listen(@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                       @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts,
                       @Payload List<String> messageBatch
    ) {
            //do a bunch of stuff
    }



    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        String topic= Optional.ofNullable(System.getProperty(TOPIC_NAME)).orElseThrow(()->new RuntimeException("topicName needs to be set"));
        assignments.keySet().stream().filter(partition->topic.equals(partition.topic()))
                .forEach(partition -> callback.seekToBeginning(topic, partition.partition()));
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {}

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {}
}

【讨论】:

    【解决方案3】:
    @KafkaListener(topicPartitions 
              = @TopicPartition(topic = "test", partitionOffsets = {
              @PartitionOffset(partition = "0", initialOffset = "0")}),groupId = "foo",
            containerFactory = "kafkaListenerContainerFactory")
    public void listenAllMsg(@Payload String message,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        System.out.println(" all msg Received Messasge in group 'foo': " + message+"RECEIVED_PARTITION_ID - "+partition);
    
    }
    

    在卡夫卡 2.3.1

    【讨论】:

      【解决方案4】:

      我认为您应该尝试编写 ConsumerSeekAwareListener,并在每次阅读消息时寻求 0 偏移量。听起来像是疯狂的解决方法,但它可能会有所帮助。希望这会对你有所帮助:-)

      class Listener implements ConsumerSeekAware {
      
       private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();
      
         ----Override all methods that are needed----
      
      @KafkaListener(...)
          public void listen(@Payload String message) {
      
                  this.seekCallBack.get().seek(topic, partition, 0);
              }
          }
      }
      

      【讨论】:

      • 谢谢,但这会导致无限循环。
      • @Nimo1981 是的,这将使消费者无限运行并监听推送的任何新记录,并且每次消费者从头开始。这不是故意的吗?对不起,我可能误解了你的意思“无限循环”。您的意思是说消费者永远不会阅读并且只会进入无限循环吗?你能澄清一下吗?
      【解决方案5】:

      @Nimo1981 所以这是一个纯 Java 的实现。我不确定它是否满足您的需求。所以基本上我提交的偏移量为 0,(意思是,即使我从 Kafka 主题中读取,我也会回到开头的偏移量。)我不确定你是否考虑过这个实现,但请让我知道如果这就是你要找的

      省略 CommitCountObj。这对你来说是不需要的。 所以默认情况下 offsetMap 会有这样的下一个偏移记录,

      offsetMap.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "一些提交成功消息"));

      但是对于您的用例,我进行了修改,当消费者没有重新启动时效果很好

      offsetMap.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(0,"未完成提交"));

      public class KafkaConsumerClass {
      
          private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(KafkaConsumerClass.class);
          private CommitCountClass commitCountobj = new CommitCountClass();
      
          public Consumer<String, List<FeedBackConsumerClass>> createConsumer() {
              Map<String, Object> consumerProps = new HashMap<String, Object>();
              consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:7070,localhost:7072");
              consumerProps.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 50000);
              consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "first_group-client1");
              // consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
              consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "first_group");
              // consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, KafkaConsumerInterceptor.class);
              consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
              consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
              consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
              consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
              consumerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1500);
              consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      
              return new KafkaConsumer<String, List<FeedBackConsumerClass>>(consumerProps);
          }
      
          public void consumeRecord() {
              log.info("Coming inside consumer consumer");
              ArrayList<String> topicList = new ArrayList<String>();
              topicList.add("topic1");
              commitCountobj.setCount(0);
              Consumer<String, List<FeedBackConsumerClass>> kafkaConsumer = createConsumer();
              kafkaConsumer.subscribe(topicList);
              log.info("after subscribing");
      
              Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>();
      
              while (true) {
      
                  ConsumerRecords<String, List<FeedBackConsumerClass>> recordList = kafkaConsumer.poll(Long.MAX_VALUE);
                  // kafkaConsumer.seekToBeginning(kafkaConsumer.assignment());
      
                  log.info("Inside while loop:" + recordList);
                  if (!recordList.isEmpty()) {
                      recordList.forEach(record -> {
                          int i = 0;
                          System.out.println(record.toString());
                          // we can make the call to the API here
                          // call the db here or any API and process the record
                          // then call the code to commit
                          // since the commit is switched off, it becomes a developers responsibility to do the auto commit
                          offsetMap.put(new TopicPartition(record.topic(), record.partition()),
                                  new OffsetAndMetadata(0, "no metadata/offset commited"));
                          // here we are incrementing the offsetMap so that we are making sure we are storing the
                          // next set of offsets in the map
                          if (commitCountobj.getCount() % 1000 == 0) {
                              kafkaConsumer.commitAsync(offsetMap, new OffsetCommitCallback() {
      
                                  @Override
                                  public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
                                          Exception exception) {
                                      // TODO Auto-generated method stub
                                      if (exception != null) {
                                          // retry it now with a sync
                                          // possibility of error occuring here as well
                                          // so capture the exception and exit the consumer gracefully
                                          kafkaConsumer.commitSync();
                                          log.error(exception.getMessage());
                                      }
                                  }
                              });
                          }
                          commitCountobj.setCount(i++);
                      });
                  }
      
              }
          }
      
      }
      

      【讨论】:

      • 我没有关闭消费者,但是你可以尝试在它读取后关闭消费者,记住你需要实现一个机制来在每次生产者产生记录时触发消费者,因为Apache Kafka自己不会这样做。
      • 记住不建议每次都关闭和启动消费者。并且 Kafka 不会在每次生成记录时都创建/重新启动/启动消费者。这绝不是一个只能使用 Kafka 才能实现的用例。此外,由于性能开销,不建议为产生的每条新记录重新启动/创建新消费者。但是,它可以保证消费者从一开始或从最早提交的偏移量开始消费,但这完全取决于用例。
      • 现在,如果您不想使用普通的旧 Java,可以使用 springboot。我遇到了这个解决方案,它就像一个魅力 - stackoverflow.com/questions/40352008/…
      猜你喜欢
      • 1970-01-01
      • 2011-02-26
      • 2018-07-12
      • 1970-01-01
      • 2015-08-10
      • 1970-01-01
      • 1970-01-01
      • 2016-09-11
      • 2017-03-14
      相关资源
      最近更新 更多