【问题标题】:high level Kafka consumer api not work高级 Kafka 消费者 API 不起作用
【发布时间】:2016-10-16 08:57:25
【问题描述】:

我设置了一个单节点 kafka 并尝试了一个简单的发布/订阅模式:

从我的笔记本电脑上,我通过代码生成一些消息:

    Properties props = new Properties();
    props.put("bootstrap.servers", "192.168.23.152:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(props);
    for (int i = 0; i < 10; i++)
        producer.send(new ProducerRecord<String, String>("tp3", Integer.toString(i), "hello " + Integer.toString(i)));

    producer.close();

我还写了一个简单的消费者:

    Properties props = new Properties();
    props.put("bootstrap.servers", "192.168.23.152:9092");
    props.put("group.id", "g1");
    props.put("client.id","client1");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("auto.offset.reset", "latest");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("tp3"));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
            System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
        TimeUnit.SECONDS.sleep(1000);
    }

但消费者没有检索到任何东西

谁能给我解释一下发生了什么? 我确信生产者工作得很好,因为我使用控制台命令来检索消息并且它工作得很好(我在这里附上了经过验证的图像)

任何帮助表示赞赏:( :( :(

【问题讨论】:

  • 如果人们想知道版本,它是 2.11-0.10.0.0(如屏幕截图所示)。我正在为同样的事情而苦苦挣扎。生产者工作正常,但消费者 API 不想做任何我做的事情。我运行的代码几乎和你一样,除了我使用 localhost 和“enable.auto.commit”为 false。
  • 我正在使用配置参数进行一些测试。我将轮询时间增加到 1000 和 5000,并且在运行消费者时,它确实有效 - 有时这里有一些计时问题,我还没有发现。
  • 我通过 Vpn 连接它们(我们的 Vpn 政策禁止许多端口),我没有被授予访问 9092 端口,所以我必须使用 sock 代理连接,我认为这是主要原因,我的疑问只是“为什么我在许多其他案例中使用 sock 代理并且它们运行良好:( :( 但在 Kafka 中失败了
  • 如果您想知道是否存在 vpn/防火墙问题,只需尝试 localhost 看看它是否有效。
  • tks @Yngvar Kristiansen,我试过了,效果很好

标签: apache-kafka kafka-consumer-api kafka-producer-api


【解决方案1】:

根据卡夫卡FAQ

为什么我的消费者从未获得任何数据?

默认情况下,当消费者第一次启动时,它会忽略一个主题中的所有现有数据,只会消费消费者启动后进来的新数据。如果是这种情况,请尝试在启动消费者后发送更多数据。或者,您可以通过将 auto.offset.reset 设置为 0.9 中新消费者的“最早”和旧消费者的“最小”来配置消费者。

【讨论】:

    猜你喜欢
    • 2016-09-01
    • 2017-07-10
    • 2015-06-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-12-03
    • 2016-05-29
    • 1970-01-01
    相关资源
    最近更新 更多