【问题标题】:How to consume messages one by one from a kafka topic如何从kafka主题中一一消费消息
【发布时间】:2019-06-23 20:58:39
【问题描述】:

我已经建立了一个带有单个分区的 kafka 主题。

kafka-topics --create --topic files.write --if-not-exists --zookeeper zookeeper:32181 --partitions 1 --replication-factor 1

可以在该主题中推送许多消息。

但我希望单个消费者(对于给定的组)一个接一个地处理这些消息。

spring:
  application:
    name: file-consumer
  cloud:
    stream:
      kafka:
        binder:
          type: kafka
          brokers: localhost
          defaultBrokerPort: 29092
          defaultZkPort: 32181
          configuration:
            max.request.size: 300000
            max.message.bytes: 300000
        bindings:
          fileWriteBindingInput:
            consumer:
              autoCommitOffset: false
      bindings:
        fileWriteBindingInput:
          binder: kafka
          destination: files.write
          group: ${spring.application.name}
          contentType: 'text/plain'

还有Java示例代码

@StreamListener(FileBindingProcessor.INPUT_FILE_WRITE)
public void onInputMessage(Message<String> message, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {

    // I Would like here to synchronize the processing of messages one by one
    // But, If many messages are pushed to this topic (single partition), they will be processed asynchronously event if I didn't yet acknowledge the current message

    acknowledgment.acknowledge();
}

我的配置中缺少什么?

我想,虽然消息没有被确认(偏移量没有增加),但没有其他消息从同一个分区消费。

【问题讨论】:

标签: spring-boot apache-kafka spring-cloud spring-cloud-stream


【解决方案1】:

如果启用autoCommitOffset(这是默认设置),则活页夹将已经确认每条记录。因此,当它到达您的StreamListener 时,记录已经被确认。

更正:上述关于StreamListener 的说法并不完全正确。当侦听器退出时,自动确认完成。

由于您只有一个分区,因此您将按照发送到该主题分区的顺序获取消息。 您可以禁用autoCommitOffset,在这种情况下,您可以使用手动确认。

【讨论】:

  • 谢谢。但事实上我已经这样做了。我已禁用 autoCommitOffset。我认为在确认未完成时不会再消耗任何消息。
  • 哦,好的。我没有在您的配置中注意到这一点。
  • &gt;So by the time, it gets to your StreamListener, the record is already acknowledged. - 不太正确;监听器退出时自动确认完成 - 与问题无关,但我想清除它。
  • 谢谢,@GaryRussell。更新了答案。
【解决方案2】:

不确认消息与停止传递下一条消息无关。

您不能将消息传递给另一个线程并稍后再确认;如果你想要单线程处理,你必须在监听线程上做所有的处理。

【讨论】:

  • 嗨,加里。也许 ack 方法不是好的解决方案。即使通过配置 max.poll.records 一条一条地消费消息,我也可以在日志中同时消费消息。例如,我将 4 条消息推送到单个分区主题,并由单个消费者使用它们。我可以在日志中看到所有消息都被消耗,即使第一个消息没有被确认。
  • 我的需要是仅在处理第一条消息后才处理第二条消息。这就是为什么我认为 ack 是解决方案。
  • 不清楚你的意思。从容器的角度来看,“已处理”意味着侦听器方法退出。确认只是在代理上提交偏移量,这意味着如果您重新启动应用程序,您将不会再次收到消息。如果您将消息交给另一个线程进行“处理”,您将收到下一条消息,而不管消费者属性如何。做你想做的事,你必须在退出监听器方法之前做所有的“处理”。
  • 对不起。处理、消费、处理、获取,对我来说都一样
  • 它适用于并发=1。我认为我的主要问题是由我的反应性使用引起的。我使用了 subscribe() 而不是 block()。我在这里发布了我的示例:github.com/oterrien/kafka-stream-for-stackoverflow。顺便说一句,谢谢你的帮助
【解决方案3】:

您可以将此消费者配置 max.poll.records 设置为 1,默认为 500

ma​​x.poll.records

在一次 poll() 调用中返回的最大记录数。

【讨论】:

  • 谢谢。但我没有看到任何结果。可能配置不好。我在这里设置了“max.pool.records”:spring.cloud.stream.kafka.binder.bindings.fileWriteBindingInput.consumer.configuration.max.poll.records: 1
  • I don't see any result 这是什么意思?这没有用?你得到的记录多于 1 条? @OlivierTerrien
  • 是的。我仍然同时消费许多消息。不过可能我的配置不好。
  • 去掉那个max.request.sizemax.message.bytes我试试,也参考@gray russell的回答
  • 再次感谢您。我会继续探索这条路。
猜你喜欢
  • 2023-03-31
  • 2014-10-25
  • 1970-01-01
  • 2019-09-26
  • 1970-01-01
  • 2018-07-27
  • 2020-04-04
  • 1970-01-01
  • 2017-02-23
相关资源
最近更新 更多