【问题标题】:Spring kafka and Kafka ClusterSpring kafka 和 Kafka 集群
【发布时间】:2018-11-21 02:44:26
【问题描述】:

我已经在集群中配置了 3 个 kafka,我正在尝试使用 spring-kafka。

但是在我杀死 kafka 领导后,我无法将其他消息发送到队列。

我将 spring.kafka.bootstrap-servers 属性设置为:“kafka-1:9092;kafka-2:9093,kafka-3:9094”以及我的 hosts 文件中的所有名称。

Kafka 0.10 版

有人知道如何正确配置吗?

编辑

我测试了一件事并发生了一个奇怪的行为。 当我启动服务时,我向主题发送消息(强制创建)

代码:

@Bean
public KafkaSyncListener synchronousListener(MessageSender sender, KafkaProperties prop) {
    sender.send(prop.getSynchronousTopic(), "Message to force create the topic! Run, Forrest, Run!");
    return new KafkaSyncListener();
}

所以,这次我没有启动 kafka-1 服务器(只是其他服务器),它发生了异常:

org.springframework.kafka.core.KafkaProducerException:发送失败; 嵌套异常是 org.apache.kafka.common.errors.TimeoutException: 60000 毫秒后更新元数据失败。

似乎 spring-kafka 只是尝试连接第一个引导服务器。 我正在使用 spring-kafka 1.3.5.RELEASE 和 kafka 0.10.1.1

编辑 2

我已经完成了你所做的测试。当我删除领导者已更改的第一个 docker 容器(kafka-1)时,也会发生同样的情况。因此,我的消费者(弹簧服务)无法使用这些消息。 但是当我再次启动 kafka-1 时,服务会收到所有消息 我的消费者 ConcurrentKafkaListenerContainerFactory:

{
  key.deserializer=class
  org.apache.kafka.common.serialization.IntegerDeserializer,
  value.deserializer=class
  org.apache.kafka.common.serialization.StringDeserializer,
  max.poll.records=500,
  group.id=mongo-adapter-service,
  ssl.keystore.location=/certs/kafka.keystore.jks,
  bootstrap.servers=[kafka-2:9093, kafka-1:9092, kafka-3:9094],
auto.commit.interval.ms=100,
security.protocol=SSL,
max.request.size=5242880,
ssl.truststore.location=/certs/kafka.keystore.jks,
auto.offset.reset=earliest
}

【问题讨论】:

    标签: apache-kafka spring-kafka


    【解决方案1】:

    服务器地址之间需要逗号,而不是分号。

    编辑

    我刚刚进行了一个没有问题的测试:

    spring.kafka.bootstrap-servers=localhost:9092,localhost:9093,localhost:9094
    

    @SpringBootApplication
    public class So50804678Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So50804678Application.class, args);
        }
    
        @KafkaListener(id = "foo", topics = "so50804678")
        public void in(String in) {
            System.out.println(in);
        }
    
        @Bean
        public NewTopic topic() {
            return new NewTopic("so50804678", 1, (short) 3);
        }
    
    }
    

    $ kafka-topics --zookeeper localhost:2181 --describe --topic so50804678
    Topic:so50804678    PartitionCount:1    ReplicationFactor:3 Configs:
        Topic: so50804678   Partition: 0    Leader: 0   Replicas: 0,1,2 Isr: 0,1,2
    

    杀死了首领,然后

    $ kafka-topics --zookeeper localhost:2181 --describe --topic so50804678
    Topic:so50804678    PartitionCount:1    ReplicationFactor:3 Configs:
        Topic: so50804678   Partition: 0    Leader: 1   Replicas: 0,1,2 Isr: 1,2
    

    $ kafka-console-producer --broker-list localhost:9092,localhost:9093,localhost:9093 --topic so50804678
    

    发送了一条消息,并被应用程序接收;除了 WARN 之外,日志中没有错误:

    [Consumer clientId=consumer-1, groupId=foo] 无法建立到节点 0 的连接。经纪人可能不可用。

    然后我重新启动了死机;停止了我的应用程序;然后添加此代码...

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            while(true) {
                System.out.println(template.send("so50804678", "foo").get().getRecordMetadata());
                Thread.sleep(3_000);
            }
        };
    }
    

    再一次,杀死当前的领导者没有任何影响;一切恢复正常。

    您可能需要调整服务器道具中的 listeners/advertised.listeners 属性。由于我的经纪人都在本地主机上,我将它们保留为默认值。

    【讨论】:

    • 我已经完成了。所以现在,当领导倒下时,我可以将消息放入队列中,但消费者不能消费。但是如果我再次启动之前的领导者,消费者就可以接受消息。你知道问题吗?谢谢回答
    • 听起来主题可能没有在其他代理上复制。 $ kafka-topics --zookeeper localhost:2181 --describe --topic myTopic.
    • 我在 kafka-2 和 kafka-3 中执行了命令,都返回了:# ./kafka-topics.sh --zookeeper 172.17.0.1:2182 --describe --topic database_request Topic:database_request PartitionCount:1 ReplicationFactor:3 Configs: Topic: database_request Partition: 0 Leader: 3 Replicas: 1,3,2 Isr: 3,2 好像对吧?
    • Spring spring上的kafka集群配置该不该简单?就像只是用节点填充引导服务器,一切都应该工作吗?
    • 请看我的编辑 2。我没有话要感谢你。
    猜你喜欢
    • 2023-03-20
    • 2017-11-20
    • 2017-06-09
    • 2023-03-31
    • 2019-07-01
    • 2019-05-29
    • 2016-09-22
    • 1970-01-01
    • 2022-09-23
    相关资源
    最近更新 更多