【发布时间】:2018-11-21 02:44:26
【问题描述】:
我已经在集群中配置了 3 个 kafka,我正在尝试使用 spring-kafka。
但是在我杀死 kafka 领导后,我无法将其他消息发送到队列。
我将 spring.kafka.bootstrap-servers 属性设置为:“kafka-1:9092;kafka-2:9093,kafka-3:9094”以及我的 hosts 文件中的所有名称。
Kafka 0.10 版
有人知道如何正确配置吗?
编辑
我测试了一件事并发生了一个奇怪的行为。 当我启动服务时,我向主题发送消息(强制创建)
代码:
@Bean
public KafkaSyncListener synchronousListener(MessageSender sender, KafkaProperties prop) {
sender.send(prop.getSynchronousTopic(), "Message to force create the topic! Run, Forrest, Run!");
return new KafkaSyncListener();
}
所以,这次我没有启动 kafka-1 服务器(只是其他服务器),它发生了异常:
org.springframework.kafka.core.KafkaProducerException:发送失败; 嵌套异常是 org.apache.kafka.common.errors.TimeoutException: 60000 毫秒后更新元数据失败。
似乎 spring-kafka 只是尝试连接第一个引导服务器。 我正在使用 spring-kafka 1.3.5.RELEASE 和 kafka 0.10.1.1
编辑 2
我已经完成了你所做的测试。当我删除领导者已更改的第一个 docker 容器(kafka-1)时,也会发生同样的情况。因此,我的消费者(弹簧服务)无法使用这些消息。 但是当我再次启动 kafka-1 时,服务会收到所有消息 我的消费者 ConcurrentKafkaListenerContainerFactory:
{
key.deserializer=class
org.apache.kafka.common.serialization.IntegerDeserializer,
value.deserializer=class
org.apache.kafka.common.serialization.StringDeserializer,
max.poll.records=500,
group.id=mongo-adapter-service,
ssl.keystore.location=/certs/kafka.keystore.jks,
bootstrap.servers=[kafka-2:9093, kafka-1:9092, kafka-3:9094],
auto.commit.interval.ms=100,
security.protocol=SSL,
max.request.size=5242880,
ssl.truststore.location=/certs/kafka.keystore.jks,
auto.offset.reset=earliest
}
【问题讨论】: