【发布时间】:2020-05-19 23:11:04
【问题描述】:
我们正在使用 spring、kafka(spring kafka 2.4.5、spring cloud stream 3.0.1)、openshift。我们有以下配置。 多个broker/topic,每个topic有8个partition,复制因子为3,多个spring boot consumer。
当我们关闭一个代理作为弹性测试的一部分时,我们遇到了以下异常,即使在我们启动服务器之后,仍然会遇到同样的错误。
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
2020-05-19 18:39:57.598 ERROR [service,,,] 1 --- [ad | producer-5] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='{49, 50, 49, 50, 54, 53, 56}' and payload='{123, 34, 115, 111, 117, 114, 99, 101, 34, 58, 34, 72, 67, 80, 77, 34, 44, 34, 110, 97, 109, 101, 34...' to topic topicname
2020-05-19 18:39:57.598 WARN [service,,,] 1 --- [ad | producer-5] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-5] Received invalid metadata error in produce request on partition topicname-4 due to org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.. Going to request metadata update now
我查了谷歌,大多数人说将重试值更改为大于 1 会起作用,但由于即使在代理启动后也会出现错误,我不确定它是否有效。
这是我在属性文件中的内容:
spring.cloud.stream.kafka.binder.brokers=${output.server}
spring.cloud.stream.kafka.binder.requiredAcks=1
spring.cloud.stream.bindings.outputChannel.destination=${output.topic}
spring.cloud.stream.bindings.outputChannel.content-type=application/json
以及使用 kafka 流 API 发送消息的一行代码。
`client.outputChannel().send(MessageBuilder.withPayload(message).setHeader(KafkaHeaders.MESSAGE_KEY, message.getId().getBytes()).build());`
请帮帮我。
谢谢 公羊
【问题讨论】:
标签: spring-boot apache-kafka spring-kafka