【问题标题】:Spring Boot Kafka consumer lags and reads wrongSpring Boot Kafka 消费者滞后并读错
【发布时间】:2020-02-27 05:32:09
【问题描述】:

我正在使用 Spring Boot 2.2 和 Kafka 集群(bitnami helm chart)。 并得到一些非常奇怪的行为。

在多个主题上拥有一些与多个消费者一起使用的 Spring Boot 应用程序。

调用 kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe -group my-app 给出:

GROUP           TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
my-app event.topic-a                 0          2365079         2365090         11              consumer-4-0c9a5616-3e96-413b-b770-b813c3d38a28 /10.244.3.47    consumer-4
my-app event.topic-a                 1          2365080         2365091         11              consumer-4-0c9a5616-3e96-413b-b770-b813c3d38a28 /10.244.3.47    consumer-4
my-app batch.topic-a                 0          278363          278363          0               consumer-3-14cb199e-646f-46ad-8ee2-98f37107fa37 /10.244.3.47    consumer-3
my-app batch.topic-a                 1          278362          278362          0               consumer-3-14cb199e-646f-46ad-8ee2-98f37107fa37 /10.244.3.47    consumer-3
my-app batch.topic-b                 0          1434            1434            0               consumer-5-a2f940c8-75e6-43d2-8d79-77d03e1ad640 /10.244.3.47    consumer-5
my-app event.topic-b                 0          2530            2530            0               consumer-6-45a32d6d-eac9-4abe-b14f-47173338e62c /10.244.3.47    consumer-6
my-app batch.topic-c                 0          1779            1779            0               consumer-1-d935a29f-ad3c-4292-9ace-5efdfff864d6 /10.244.3.47    consumer-1
my-app event.topic-c                 0          12308           13502           1194            -                                               -               - 

再次调用它会给出

GROUP           TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
my-app event.topic-a                 0          2365230         2365245         15              consumer-4-0c9a5616-3e96-413b-b770-b813c3d38a28 /10.244.3.47    consumer-4
my-app event.topic-a                 1          2365231         2365246         15              consumer-4-0c9a5616-3e96-413b-b770-b813c3d38a28 /10.244.3.47    consumer-4
my-app batch.topic-a                 0          278363          278363          0               consumer-3-14cb199e-646f-46ad-8ee2-98f37107fa37 /10.244.3.47    consumer-3
my-app batch.topic-a                 1          278362          278362          0               consumer-3-14cb199e-646f-46ad-8ee2-98f37107fa37 /10.244.3.47    consumer-3
my-app batch.topic-b                 0          1434            1434            0               consumer-5-a2f940c8-75e6-43d2-8d79-77d03e1ad640 /10.244.3.47    consumer-5
my-app event.topic-b                 0          2530            2530            0               consumer-6-45a32d6d-eac9-4abe-b14f-47173338e62c /10.244.3.47    consumer-6
my-app batch.topic-c                 0          1779            1779            0               consumer-1-d935a29f-ad3c-4292-9ace-5efdfff864d6 /10.244.3.47    consumer-1
my-app event.topic-c                 0          12308           13505           1197            consumer-2-d52e2b96-f08c-4247-b827-4464a305cb20 /10.244.3.47    consumer-2

如您所见,event.topic-c 的使用者现在在那里,但滞后了 1197 个条目。 应用程序本身从主题中读取,但总是相同的事件(看起来像滞后量)但偏移量没有改变。 在 kafka 或 spring boot 上,我没有收到任何错误或日志条目。 我所拥有的只是针对该特定主题,一次又一次地处理相同的事件.....应用程序上的所有其他主题都正常工作。

这是客户端配置:

allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [kafka:9092]
check.crcs = true
client.dns.lookup = default
client.id = 
client.rack = 
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = sap-integration
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS

任何想法..我有点失落..

编辑: Spring 配置非常标准:

configProps[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapAddress
configProps[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
configProps[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = MyJsonDeserializer::class.java
configProps[JsonDeserializer.TRUSTED_PACKAGES] = "*"

以下是日志中的一些示例:

2019-11-01 18:39:46.268 DEBUG 1 --- [ntainer#0-0-C-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=..., headers={kafka_offset=37603361, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@6ca11277, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=topic-c, kafka_receivedTimestamp=1572633584589, kafka_groupId=my-app}]]
2019-11-01 18:39:46.268 DEBUG 1 --- [ntainer#0-0-C-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=..., headers={kafka_offset=37603362, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@6ca11277, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=topic-c, kafka_receivedTimestamp=1572633584635, kafka_groupId=my-app}]]
2019-11-01 18:39:46.268 DEBUG 1 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {topic-c-0=OffsetAndMetadata{offset=37603363, leaderEpoch=null, metadata=''}}
2019-11-01 18:39:46.268 DEBUG 1 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {topic-c-0=OffsetAndMetadata{offset=37603363, leaderEpoch=null, metadata=''}}
....
2019-11-01 18:39:51.475 DEBUG 1 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2019-11-01 18:39:51.475 DEBUG 1 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {}

当消费者滞后时

GROUP           TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
my-app          topic-c              0          37603363        37720873       117510          consumer-3-2b8499c0-7304-4906-97f8-9c0f6088c469 /10.244.3.64    consumer-3

没有错误,没有警告..只是没有更多消息..

谢谢

【问题讨论】:

  • 你需要展示你的 Spring 配置。还要打开 DEBUG 日志记录以观察轮询和偏移提交。
  • 在 clinet 端或 kafka 端调试?
  • 客户端,最初。
  • 好的。希望这是正确的:“org.springframework.kafka”
  • 你应该从那开始——我们记录收到的记录以及何时提交偏移量。如果这对您的调查没有帮助,那么您也需要启用 org.apache.kafka - 但我会从 Spring 开始,因为 kafka 非常冗长。

标签: spring-boot apache-kafka spring-kafka


【解决方案1】:

你需要寻找这样的日志...

2019-11-01 16:33:31.825 INFO 35182 --- [ kgh1231-0-C-1] Oakccinternals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=kgh1231] (重新)加入组

...

2019-11-01 16:33:31.872 INFO 35182 --- [ kgh1231-0-C-1] osklKafkaMessageListenerContainer :分配的分区:[kgh1231-0,kgh1231-2,kgh1231-1,kgh1231-4, kgh1231-3]

...

2019-11-01 16:33:31.897 DEBUG 35182 --- [kgh1231-0-C-1] essageListenerContainer$ListenerConsumer:收到:10 条记录

...

2019-11-01 16:33:31.902 调试 35182 --- [ kgh1231-0-C-1] .a.RecordMessagingMessageListenerAdapter :处理 [GenericMessage [payload=foo1, headers={kafka_offset=80, kafka_consumer=org .apache.kafka.clients.consumer.KafkaConsumer@3d00c543, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=kgh1231, kafka_receivedTimestamp=1572640411869}]]

...

2019-11-01 16:33:31.906 DEBUG 35182 --- [ kgh1231-0-C-1] .a.RecordMessagingMessageListenerAdapter :处理 [GenericMessage [payload=foo5, headers={kafka_offset=61, kafka_consumer=org .apache.kafka.clients.consumer.KafkaConsumer@3d00c543, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=3, kafka_receivedTopic=kgh1231, kafka_receivedTimestamp=1572640411870}]]

2019-11-01 16:33:31.907 调试 35182 --- [ kgh1231-0-C-1] essageListenerContainer$ListenerConsumer:提交列表:{kgh1231-0=OffsetAndMetadata{offset=82, metadata=''} , kgh1231-2=OffsetAndMetadata{offset=62, metadata=''}, kgh1231-1=OffsetAndMetadata{offset=62, metadata=''}, kgh1231-4=OffsetAndMetadata{offset=62, metadata=''}, kgh1231 -3=OffsetAndMetadata{offset=62, metadata=''}}

2019-11-01 16:33:31.908 调试 35182 --- [ kgh1231-0-C-1] essageListenerContainer$ListenerConsumer :提交:{kgh1231-0=OffsetAndMetadata{offset=82, metadata=''}, kgh1231-2=OffsetAndMetadata{offset=62, metadata=''}, kgh1231-1=OffsetAndMetadata{offset=62, metadata=''}, kgh1231-4=OffsetAndMetadata{offset=62, metadata=''}, kgh1231- 3=OffsetAndMetadata{offset=62, metadata=''}}

如果您没有看到类似的内容,则说明您的消费者配置不正确。

如果您无法弄清楚,请将您的日志发布到 PasteBin 之类的地方。

【讨论】:

  • pastebin.com/yrSytSHD 不幸的是,您推荐的一切都在那里..即使客户端的配置对于应用程序的所有主题都完全相同..我只指定反序列化器,max.poll.records 和trusted_pa​​ckages .. 其他一切都是 spring boot 的默认设置 .. 我注意到的是,如果我启用调试日志记录,故障发生的速度比没有 ..
  • 这对我来说也没有意义;抱歉 - 显然偏移量正在正确提交,所以这不是 Spring 问题,但我不知道为什么后续的民意调查没有返回任何记录(除非 Consumer 一直是 pause()d)。即使发布者使用事务,您的消费者隔离也是read_uncommitted。显然,它移动了一些时间,因为随着时间的推移数字是不同的。是什么让您在最初的问题中想到> but always the same events ?已提交和结束的偏移量明显在进行中。
  • 进行了一点调查 .. 必须修复主题的分区,但没有改变 - 据我了解,我需要与收听该主题的消费者一样多的分区 .. 我的意思是原始问题..我有两个spring boot应用程序正在听主题c ..都以相同的方式配置.. ..两者都在消耗,应用程序A工作得很好..应用程序B,大约12小时后停止消耗数据..来自相同的主题 .. 每个应用程序都有自己的消费者组,但在两个分区上都有消费者 - 对吗?对我来说听起来有点奇怪
  • 你至少需要和消费者一样多的分区;一个消费者可以从多个分区消费。如果他们有不同的组,那么他们是独立的,两个组都应该得到所有的记录;一个消费者组将停止接收记录对我来说毫无意义(已接收:0 条记录)。
  • 对我来说 .. 也改为批处理以避免在处理记录时可能超时(这可能是一个问题吗?)......结果相似但更糟糕的是,还丢失了一些“数据” .. 重新启动应用程序后.. 是否可以有一个具有批处理的侦听器和一个具有“单个”的侦听器? Micronaut 支持类似的东西.. 目前我有一个带有 DefaultKafkaConsumerFactory 和 ConcurrentKafkaListenerContainerFactory 的 KafkaConsumerConfig,但所有设置都适用于所有消费者
【解决方案2】:

通过所有这些神秘地解决了这个问题:

  • 将 Kafka 升级到新版本
  • 将 Spring Boot 升级到更新版本
  • 提高软件性能
  • 切换到批处理
  • 添加 Health Check 并将其与 liveness probe 结合

它现在运行了一个多星期,没有再次出现该错误。

【讨论】:

    猜你喜欢
    • 2020-08-08
    • 1970-01-01
    • 1970-01-01
    • 2023-04-11
    • 2016-12-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多