【问题标题】:Confluent Control Center - Consumer is not listedConfluent 控制中心 - 未列出消费者
【发布时间】:2021-10-26 01:40:35
【问题描述】:

我有以下代码可以连接到 Kafka

Properties props = new Properties();
props.put("bootstrap.servers", "myconfluentkafkabroker:9092");
props.put("group.id","test");
props.put("enable.auto.commit","true");
props.put("auto.commit.interval.ms","1000");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my_CG");
props.put("group.instance.id", "my_instance_CG_id");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("key.deserializer", Class.forName("org.apache.kafka.common.serialization.StringDeserializer"));
props.put("value.deserializer", Class.forName("org.apache.kafka.common.serialization.StringDeserializer"));

KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(Arrays.asList("MyTopic"));
try {
    while (true) { 
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
        {
            log.debug("topic = %s, partition = %d, offset = %d,"
                customer = %s, country = %s\n",
                record.topic(), record.partition(), record.offset(),
                record.key(), record.value());

            int updatedCount = 1;
            if (custCountryMap.countainsKey(record.value())) {
                updatedCount = custCountryMap.get(record.value()) + 1;
            }
            custCountryMap.put(record.value(), updatedCount)

            JSONObject json = new JSONObject(custCountryMap);
            System.out.println(json.toString(4));
        }
    }
} finally {
    consumer.close();
}

代码没有抛出任何错误,但我仍然没有看到列出的消费者

这会是个问题吗?

props.put("group.instance.id", "my_instance_CG_id");

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api confluent-control-center


    【解决方案1】:

    您应该使用 Kafka 提供的内置工具验证您看到的信息,例如 kafka-consumer-groups.sh

    您还需要实际轮询消息并提交偏移量,而不仅仅是在看到任何内容之前订阅。

    否则,对于特定的控制中心仪表板,可能需​​要您将 Monitoring Interceptors 添加到您的客户端中

    【讨论】:

    • 是的,我拉,但它正在打印 0 条消息
    • 好吧,那就用CLI工具来验证零延迟
    • 如何打印 ConsumerConfig 值?类似于 2021-04-01T00:44:32.367+05:30 [APP/PROC/WEB/0] [OUT] 2021-03-31 19:14:32.367 INFO 38 --- [http-nio-8080-exec -9] org.apache.kafka.clients.consumer.ConsumerConfig:ConsumerConfig 值:2021-04-01T00:44:32.367+05:30 [APP/PROC/WEB/0] [OUT] allow.auto.create.topics = true 2021-04-01T00:44:32.367+05:30 [APP/PROC/WEB/0] [OUT] auto.commit.interval.ms = 5000 2021-04-01T00:44:32.367+05:30 [ APP/PROC/WEB/0] [OUT] auto.offset.reset = 最新 2021-04-01T00:44:32.367+05:30 [APP/PROC/WEB/0] [OUT] bootstrap.servers = [17xxx:第9092章
    • 如果您正确设置了 SLF4J 记录器,这些应该会自动打印出来
    • 我不知道。去掉它。它有什么改变吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-07-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多