【发布时间】:2016-12-02 09:05:59
【问题描述】:
您好,我是 kafka 的新手,我有一个简短的问题。
我实现了一个 kafka 生产者和消费者 zookeeper 和 producer 在另一台服务器上运行 (192.168.10.233) 消费者正在另一台服务器上运行 (192.168.10.234) 两者都是本地连接的
问题是 消费者与生产者建立连接但没有收听任何消息,但如果我将此监听部分移动到同一服务器 (192.168.10.233) ,它正在接收消息
这是我的消费者代码
def listen(): Unit = {
val props = new Properties();
props.put("bootstrap.servers", "192.168.10.233:9092");
props.put("group.id", "groupId");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
val consumer = new KafkaConsumer(props);
println("calling ---- but yet to receive the message")
consumer.subscribe(List("test"));
while (true) {
val records = consumer.poll(100);
for (record <- records)
println("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
}
我还从外面检查了192.168.10.233:9092,天气端口没有被任何东西阻塞。
【问题讨论】:
-
我认为可能是偏移的问题。您可以尝试设置偏移量,例如: props.put("auto.offset.reset", "earliest");如果有效,您可以调整该值
-
是的,我添加了偏移量,但仍然没有运气
-
您使用的是哪个版本的 Apache Kafka? 0.9 还是 0.10?您的客户端 API 应该具有相同的版本。
-
@NangSaigon 不一定。 Kafka 向后兼容,新代理可以处理来自旧客户端的消息。但反之则不然
-
我使用的是 0.10 版本
标签: apache-kafka kafka-consumer-api