【发布时间】:2016-10-16 08:57:25
【问题描述】:
我设置了一个单节点 kafka 并尝试了一个简单的发布/订阅模式:
从我的笔记本电脑上,我通过代码生成一些消息:
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.23.152:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>("tp3", Integer.toString(i), "hello " + Integer.toString(i)));
producer.close();
我还写了一个简单的消费者:
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.23.152:9092");
props.put("group.id", "g1");
props.put("client.id","client1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "latest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("tp3"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
TimeUnit.SECONDS.sleep(1000);
}
但消费者没有检索到任何东西
谁能给我解释一下发生了什么? 我确信生产者工作得很好,因为我使用控制台命令来检索消息并且它工作得很好(我在这里附上了经过验证的图像)
任何帮助表示赞赏:( :( :(
【问题讨论】:
-
如果人们想知道版本,它是 2.11-0.10.0.0(如屏幕截图所示)。我正在为同样的事情而苦苦挣扎。生产者工作正常,但消费者 API 不想做任何我做的事情。我运行的代码几乎和你一样,除了我使用 localhost 和“enable.auto.commit”为 false。
-
我正在使用配置参数进行一些测试。我将轮询时间增加到 1000 和 5000,并且在运行消费者时,它确实有效 - 有时这里有一些计时问题,我还没有发现。
-
我通过 Vpn 连接它们(我们的 Vpn 政策禁止许多端口),我没有被授予访问 9092 端口,所以我必须使用 sock 代理连接,我认为这是主要原因,我的疑问只是“为什么我在许多其他案例中使用 sock 代理并且它们运行良好:( :( 但在 Kafka 中失败了
-
如果您想知道是否存在 vpn/防火墙问题,只需尝试 localhost 看看它是否有效。
-
tks @Yngvar Kristiansen,我试过了,效果很好
标签: apache-kafka kafka-consumer-api kafka-producer-api