Consumer - 消费者
文章目录
????消费者示例
// 配置请求参数
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-1");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "NewConsumer");
final KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// consumer subscribe topics , 订阅支持正则表达式 例如Topic: my-topic.*(只有发生rebalance,新增的Topic才会被消费.)
kafkaConsumer.subscribe(Collections.singletonList("my-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(300));
for (ConsumerRecord<String, String> record : records) {
log.info("topic = {}, partition = {}, offset = {}, " + "key = {}, value = {}",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
} finally {
// 最终关闭消费者.
kafkaConsumer.close();
}
Poll方法概述
Consumer的Poll方法,必须保持一直调用(没有数据也调用). 因为Poll方法使Consumer保持活性.
新消费者第一次调用Poll方法,负责发现GroupCoordinator. 通过GroupCoordinator加入消费者组,收到分配给它的分区.
Close方法概述
Close方法会释放网络资源,并且立即触发rebalance操作. 而不是等Kafka Cluster被动发现.
Offset提交方式
消费者消费数据之后,需要告诉kafka,它消费到了什么地方(offset).
Offset自动提交
Kafka存在自动提交偏移量机制props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);,默认情况下,5秒钟会触发. 但是可以通过 auto.commit.interval.ms进行配置,自动提交机制依赖于poll方法(保持心跳).因为每次调用poll方法会检查是否应该提交偏移量. 相同原理close方法也会提交偏移量.
Offset手动提交
手动提交需要把auto.commit.offset=false,这样consumer则不会进行自动提交. commit the last offset in the batch.
commitSync
同步提交的所有方法,按照功能分为两类,一类是指定特定分区,另一类是全部分区.同步提交会进行失败重试.
????同步提交示例
⚠️ 同步手动提高会降低吞吐量.
commitAsync
异步提交没有失败重试机制. 假如先提交2000 offset(失败), 然后提交3000 offset(成功),如果有重试机制,则会发生最后2000提交成功.
????异步提交示例
结合commitAsync和commitSync
指定Offset消费
指定Offset消费的时候,也需要指定Partition,因为Offset具体到Partition才有意义.
seekToBeginning方法是跳到指定Partition的Offset为0处,也就是从头开始消费.seekToEnd方法是跳到指定Partition的Offset的末尾处,当时数据的末端.而最后一个seek(TopicPartition,long)跳到指定Partition的指定Offset处消费.
⚠️ 在调用seek方法的时候,需要先获得分区的信息,而分区的信息要通过poll方法来获得. 如果调用seek方法时,没有分区信息,则会抛出IllegalStateException异常 No current assignment for partition xxxx.
???? 下图是使用seek方法示例
红框里是关键步骤, kafkaConsumer.poll(Duration.ofMillis(50000))是为了加入消费者组,获得Topic相关的信息.kafkaConsumer.assignment()获得分配给的分区.根据分配的分区进行偏移量重置. 如果重置的分区不在列表里面,则会抛出上面说的异常.
Consumer Exit
消费者的poll方法一般都是被while(true)语句包含的,那么退出就是一个问题.
- 在另外一个线程使用
consumer.wakeup();会抛出WakeupException异常,该异常不需要去捕获,它会自己发送LeaveGroup request.但是在之后需要手动关闭consumer. - 总的来说也是方法一的变型. 利用
Runtime.getRuntime().addShutDownHook(Thread);来调用consumer.wakeup();, ShutDownHook运行在另外一个线程中. 当按下ctrl+c的时候,触发ShutDownHook中传入的线程,最后运行完结束主程序.
???? 消费者安全退出示例
指定分区消费
指定一个主题的某些分区消费,不会触发rebalance.
假如下面例子
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.1.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : eec43959745f444f
[Partition(topic = TwentyMillion, partition = 2, leader = 0, replicas = [0], isr = [0], offlineReplicas = []), Partition(topic = TwentyMillion, partition = 4, leader = 0, replicas = [0], isr = [0], offlineReplicas = [])]
[main] INFO org.apache.kafka.clients.Metadata - Cluster ID: vfySGyF5TV6oAkf7kbh7hA
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=TwentyConsumerManual] Discovered group coordinator 192.168.0.251:9092 (id: 2147483647 rack: null)
日志表明分配到两个分区(共有5个分区), 最后clientId=consumer-1,groupId=TwentyConsumerManual.
现在有个问题,假如再启动一个消费者,而该消费者是用subscribe订阅,会产生什么现象?
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.1.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : eec43959745f444f
[main] INFO org.apache.kafka.clients.Metadata - Cluster ID: vfySGyF5TV6oAkf7kbh7hA
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=TwentyConsumerManual] Discovered group coordinator 192.168.0.251:9092 (id: 2147483647 rack: null)
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=TwentyConsumerManual] Revoking previously assigned partitions []
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=TwentyConsumerManual] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=TwentyConsumerManual] Successfully joined group with generation 3
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=TwentyConsumerManual] Setting newly assigned partitions [TwentyMillion-0, TwentyMillion-4, TwentyMillion-3, TwentyMillion-2, TwentyMillion-1]
日志表明启动的消费者和手动指定分区启动的消费者是同一个,说明两者并不能协同工作.