【问题标题】:ConnectException when one Kafka broker of cluster is down集群的一个 Kafka 代理关闭时发生 ConnectException
【发布时间】:2016-02-02 23:39:45
【问题描述】:

我有两个 Kafka 代理:server1:9092 和 server2:9092 我正在使用 Java 客户端向该集群发送消息,代码如下:

@Test
public void sendRecordToTopic() throws InterruptedException, ExecutionException {

    //See at http://kafka.apache.org/documentation.html#newproducerconfigs
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "server1:9092,server2:9092");
    props.put(ProducerConfig.ACKS_CONFIG, "1");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

    ProducerRecord<String, String> myRecord =
            new ProducerRecord<String, String>("my-replicated-topic", "test", "someValue");

    boolean syncSend = true;

    if (syncSend) {
        //Synchronously send
        producer.send(myRecord).get();
    } else {
        //Asynchronously send
        producer.send(myRecord);
    }
    producer.close();
}

当其中一个代理关闭时,测试在某些情况下会抛出此异常(在此异常示例中,“server1”已关闭):

2015-11-02 17:59:29,138 警告 [org.apache.kafka.common.network.Selector] I/O 错误 server1/40.35.250.227 java.net.ConnectException:连接被拒绝: 没有更多信息 sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 在 sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) 在 org.apache.kafka.common.network.Selector.poll(Selector.java:238) 在 org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) 在 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) 在 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) 在 java.lang.Thread.run(Thread.java:745)

【问题讨论】:

    标签: java apache-kafka


    【解决方案1】:

    这是我解决问题的方法:

    • 至少需要3个ZooKeeper节点,我还要配置一个。这是因为 ZK 确定领导者的方式,它需要更多的 50% 的节点启动和运行。

    • 将此参数添加到 ZooKeeper 属性文件:

      tickTime=200

    使用其他参数需要此参数:

    initLimit=5
    syncLimit=2
    
    • 在Producer中添加这个属性:

      props.setProperty(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "10000");

    "RECONNECT_BACKOFF_MS_CONFIG" 属性只抛出一次 WARN(不是无限循环),然后发送消息

    【讨论】:

      【解决方案2】:

      我遇到了这个确切的问题,结果发现原因是对新配置属性之一的误解。

      在从以前的生产者 API 迁移时,我寻找与“topic.metadata.refresh.interval.ms”等效的 API,并选择了 ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG。然而,在尝试访问元数据被视为失败之前,这被证明是超时,并且由于我将其设置为几分钟,因此它阻止了故障转移的发生。

      将此设置为较低的值(我选择了 500 毫秒)似乎解决了我的问题。

      我相信我最初寻找的值是 ProducerConfig.METADATA_MAX_AGE_CONFIG 作为刷新元数据之前的超时,无论是否发生故障

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2017-06-09
        • 1970-01-01
        • 1970-01-01
        • 2017-08-21
        • 1970-01-01
        • 2021-01-09
        • 2020-05-08
        • 2021-08-12
        相关资源
        最近更新 更多