【问题标题】:Spring Cloud Stream Manual Poller KafkaSpring Cloud Stream 手动轮询器 Kafka
【发布时间】:2020-05-21 19:59:41
【问题描述】:

我不想使用@KafkaListener 或@StreamListener,但我想手动轮询kafka。我正在使用 spring-cloud-starter-stream-kafka 库,并且我有以下 Kafka Producer

  @Autowired
  private KafkaTemplate<byte[], byte[]> template;

  public void sendMessages() {
    IntStream.range(2)
             .forEach(val -> {
               template.send("kafka-topic", "hello".getBytes());
             });
  }

我想使用 spring-kafka 手动轮询相同的 kafka 主题。我尝试了以下消费者

 @Autowired
  private ConsumerFactory consumerFactory;

  public void processKafkaRecords() throws InterruptedException {
    Consumer<byte[], byte[]> consumer = consumerFactory.createConsumer("0", "consumer-1");
    consumer.subscribe(Arrays.asList("kafka-topic"));
    ConsumerRecords<byte[], byte[]> poll = consumer.poll(Duration.ofMillis(1000));
    poll.forEach(record -> {
      log.info("record {}", record);
    });
  }

application.properties

spring.cloud.stream.bindings.pollableInput.destination=kafka-topic
spring.cloud.stream.bindings.pollableInput.group=kafka-topic
spring.cloud.stream.bindings.pollableInput.consumer.batch-mode=true
spring.cloud.stream.bindings.pollableInput.consumer.header-mode=none
spring.cloud.stream.bindings.pollableInput.consumer.use-native-decoding=true

spring.cloud.stream.kafka.bindings.pollableInput.consumer.autoCommitOffset=false

但是,消费者永远不会得到生产者发送的任何记录。任何想法如何手动轮询 kafka 主题?

【问题讨论】:

  • 你需要展示你的消费者工厂配置和application.yml/properties。
  • 如何触发消费者?为什么你使用 Steam,然后使用普通产品发布并由消费者投票?
  • @GaryRussell 我没有消费者工厂配置,我假设它会使用默认的,我只是将它绑定到 kafka 主题,这还不够吗?有没有一个简单的例子,我可以看到一个简单的设置?谢谢
  • @daniu 消费者是spring bean,我手动触发。我使用 spring stream 因为我一直在使用 streamListener 但现在我想手动开始轮询
  • 嗯,你从某个地方得到了一个消费者工厂;否则@Autowired 将无法工作。如果您的 Kafka 在本地主机上;你可能会得到 Boot 的默认值。您需要添加 spring.kafka.consumer.auto-offset-reset=earliest - 有关更多信息,请参阅 Spring Boot Kafka 文档章节。您还需要等待通过RebalanceListener 进行订阅(或使用consumer.assign() 而不是subscribe())。

标签: java spring apache-kafka spring-kafka spring-cloud-stream


【解决方案1】:

可能有几个原因:

  1. Duration.ofMillis(1000) - 尝试增加时间,在某些情况下 1s 可能太低,除非您的客户端和 kafka 都在同一台机器上运行。因为poll(Duration)的文档说如果超时,会返回一个空记录集
  2. 如果您先启动了生产者,然后是消费者,并且您没有将偏移重置策略设置为最早,那么您将看不到任何消息,因为默认情况下消费者将从最新的偏移开始消费。所以,尝试设置以下auto.offset.reset=earliest
  3. 同一消费者组中的另一个消费者可能正在运行,并且只有 1 个分区,或者消费者组已经在最后一个偏移量。在这种情况下,您可以尝试更改消费者组 ID。

【讨论】:

  • 持续时间没有任何区别,道具 reset=earliest 也没有。我尝试做 consumer.assign(Collections.singleton(new TopicPartition("kafka-topic", 0))) 而不是 subscribe ,这开始立即轮询一些结果,但不是全部。我觉得我必须按照上面的建议实现 RebalanceListener。
  • 如果要从话题开始轮询,则需要调用seekToBeginning()方法。至于订阅,如果消费者组偏移量指向最后的偏移量,那么您将看不到任何消息。尝试更改group.id 或重置消费者组
猜你喜欢
  • 1970-01-01
  • 2018-04-28
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-02-15
  • 1970-01-01
相关资源
最近更新 更多