【发布时间】:2021-11-11 16:45:25
【问题描述】:
我最近在学习Kafka,除非我指定--parititon 0参数,否则我的消费者无法消费任何记录。换句话说,我不能使用以下记录:
kafka-console-consumer --bootstrap-server 127.0.0.10:9092 --topic first-topic
但工作方式如下:
kafka-console-consumer --bootstrap-server 127.0.0.10:9092 --topic first-topic --partition 0
主要问题是,当我转移到 java 代码时,我的 KafkaConsumer 类无法获取记录,我需要知道如何在 java KafkaConsumer 中指定 partition 数字?!
我当前的java代码是:
public class ConsumerDemo {
public static void main(String[] args) {
Logger logger = LoggerFactory.getLogger((ConsumerDemo.class.getName()));
String bootstrapServer = "127.0.0.10:9092";
String groupId = "my-kafka-java-app";
String topic = "first-topic";
// create consumer configs
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
//properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, partition);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// create consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
// subscribe consumer to our topic
consumer.subscribe(Collections.singleton(topic)); //means subscribtion to one topic
// poll for new data
while(true){
//consumer.poll(100); old way
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records){
logger.info("Key: " + record.key() + ", Value: "+ record.value() );
logger.info("Partition: " + record.partition() + ", Offset: "+ record.offset());
}
}
}
}
【问题讨论】:
-
使用新鲜消费组时无法复制。您是否修改了任何 Kafka 服务器属性?您使用的是什么版本的 Kafka 和客户端?
-
换句话说,请显示
kafka-consumer-groups --bootstrap-server 127.0.0.10:9092 --group my-kafka-java-app --describe的输出... 为什么是127.0.0.10?此地址不是正确的环回 IP 或专用 LAN 地址 -
@OneCricketeer,关于IP,这是一个假设的IP,而在我们执行时我输入了服务器的真实姓名..这没什么大不了的
-
@OneCricketeer,关于命令的输出,它显示:
Error: Executing consumer group command failed due to org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. -
cloudera上的版本是
2.1.0-cdh6.2.1
标签: java apache-kafka kafka-consumer-api