【发布时间】:2018-12-20 19:25:34
【问题描述】:
问题描述:
我们的 Kafka 消费者(在 Spring Boot 2.x 中开发)正在执行几天。 当我们重新启动这些消费者时,该主题的所有消息都会再次被消费,但仅在特定条件下。
条件:
我们假设组合代理/主题配置 (log.retention.*, offsets.retention.*) 和消费者配置 (auto.offset. reset = 最早)导致了这种行为。
显然我们不能将consumer设置为"latest",因为如果consumer停止了,有新消息到来,当consumer再次启动时,这些消息就不会被消费了。
问题:
避免这种情况的正确设置是什么?
在上一个 Kafka Broker 版本 (2.x) 中,log.retention.* 和 offsets.retention.* 的默认值是相同的 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-186%3A+Increase+offsets+retention+default+to+7+days)
这个新的配置设置能解决问题吗?
消费者配置(auto.commit委托于 Spring Cloud Stream Framework):
auto.commit.interval.ms = 100
auto.offset.reset = earliest
bootstrap.servers = [server1:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = consumer_group1
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
代理配置:
log.retention.ms = 86400000
log.retention.minutes = 10080
log.retention.hours = 168
log.retention.bytes = -1
offsets.retention.ms = 864000000
offsets.retention.minutes = 14400
offsets.retention.hours = 240
unclean.leader.election.enable = false
log.cleaner.enable = true
auto.leader.rebalance.enable = true
leader.imbalance.check.interval.seconds = 300
log.retention.check.interval.ms = 300000
log.cleaner.delete.retention.ms = 604800000
感谢和问候
【问题讨论】:
-
你是对的,这是因为
log.retention.*和offsets.retention.*的值不同。请看stackoverflow.com/questions/50741783/…
标签: java spring-boot apache-kafka spring-cloud-stream