【发布时间】:2016-02-02 23:39:45
【问题描述】:
我有两个 Kafka 代理:server1:9092 和 server2:9092 我正在使用 Java 客户端向该集群发送消息,代码如下:
@Test
public void sendRecordToTopic() throws InterruptedException, ExecutionException {
//See at http://kafka.apache.org/documentation.html#newproducerconfigs
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"server1:9092,server2:9092");
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
ProducerRecord<String, String> myRecord =
new ProducerRecord<String, String>("my-replicated-topic", "test", "someValue");
boolean syncSend = true;
if (syncSend) {
//Synchronously send
producer.send(myRecord).get();
} else {
//Asynchronously send
producer.send(myRecord);
}
producer.close();
}
当其中一个代理关闭时,测试在某些情况下会抛出此异常(在此异常示例中,“server1”已关闭):
2015-11-02 17:59:29,138 警告 [org.apache.kafka.common.network.Selector] I/O 错误 server1/40.35.250.227 java.net.ConnectException:连接被拒绝: 没有更多信息 sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 在 sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) 在 org.apache.kafka.common.network.Selector.poll(Selector.java:238) 在 org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) 在 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) 在 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) 在 java.lang.Thread.run(Thread.java:745)
【问题讨论】:
标签: java apache-kafka