【发布时间】:2019-07-22 18:25:18
【问题描述】:
我正在查看一个 spring boot 服务,它从 apache kafka 读取消息,通过 http 从另一个服务请求消息指示的记录,处理它们,将一些数据保存到数据库中并将结果发布到另一个主题。
这是通过
完成的@StreamListener(Some.INPUT)
@SendTo(Some.OUTPUT)
这是在几个服务中完成的,通常工作得很好。唯一的属性集是
spring.cloud.stream.binder.consumer.concurrency=20
主题本身有 20 个分区,应该适合。
在监控来自 kafka 的读取时,我们发现吞吐量非常低且行为异常:
该应用一次最多可读取 500 条消息,然后 1-2 分钟无内容。在此期间,消费者反复记录“丢失心跳,因为分区已重新平衡”,“重新分配分区”,有时甚至会抛出异常说“提交失败,因为轮询间隔已过”
我们得出结论,这意味着消费者获取 500 条消息,处理所有消息需要很长时间,错过了时间窗口,因此无法将 500 条消息中的任何一条提交给代理 - 代理重新分配分区并重新发送重复相同的消息。
查看线程和文档后,我找到了“max.poll.records”属性,但在哪里设置此属性的建议相互冲突。
有人说要设置在下面
spring.cloud.stream.bindings.consumer.<input>.configuration
有人说
spring.cloud.stream.kafka.binders.consumer-properties
我尝试将两者都设置为 1,但服务行为没有改变。
我该如何正确处理消费者无法在默认设置下跟上所需轮询间隔的情况?
通用yaml:
spring.cloud.stream.default.group=${spring.application.name}
服务-yaml
spring:
clould:
stream:
default:
consumer.headerMode: embeddedHeaders
producer.headerMode: embeddedHeaders
bindings:
someOutput:
destination: outTopic
someInput:
destination: inTopic
consumer:
concurrency: 30
kafka:
bindings:
consumer:
someInput:
configuarion:
max.poll.records: 20 # ConsumerConfig ignores this
consumer:
enableDlq: true
configuarion:
max.poll.records: 30 # ConsumerConfig ignores this
someInput:
configuarion:
max.poll.records: 20 # ConsumerConfig ignores this
consumer:
enableDlq: true
configuarion:
max.poll.records: 30 # ConsumerConfig ignores this
binder:
consumer-properties:
max.poll.records: 10 # this gets used first
configuration:
max.poll.records: 40 # this get used when the first one is not present
“忽略这个”总是意味着,如果没有设置其他属性,ConsumerConfiguration 保持默认为 500 以获取最大轮询记录
编辑:我们已经接近了:
问题与设置了exponentialBackoffStrategy 的spring 重试有关 - 以及一系列错误有效地停止了应用程序。
我没有得到的是,我们通过向相关主题发布格式错误的消息来强制出现 200 个错误,这导致应用程序读取 200 个,花费很长时间(使用旧的重试配置),然后一次提交所有 200 个错误。
如果我们有这有什么意义
max.poll.records: 1
concurrency: 1
ackEachRecod = true
enableDlq: true # (which implicitly makes autoCommitOffsets = true according to the docs)
【问题讨论】:
-
您好,我也遇到了同样的问题。您能找到解决方法吗?
-
是的,答案是正确的 - 最大轮询记录数量过多会导致处理时间过长,从而使消费者超时,因此请将最大轮询设置得更低和/或增加轮询间隔
标签: apache-kafka spring-cloud spring-cloud-stream