如果您计划在 Java 应用程序中更改偏移量,您可以使用 AdminClient's API alterConsumerGroupOffsets。
这是一个使用 Kafka 2.8.0 测试的简单示例:
String brokers = "localhost:9092";
String consumerGroupName = "test1337";
TopicPartition topicPartition = new TopicPartition("test", 0);
Long offset = 4L;
Map<TopicPartition, OffsetAndMetadata> toOffset = new HashMap<>();
toOffset.put(topicPartition, new OffsetAndMetadata(offset));
// Create AdminClient
final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
AdminClient adminClient = AdminClient.create(properties);
try {
// Check offsets before altering
KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> offsetsBeforeResetFuture = adminClient.listConsumerGroupOffsets(consumerGroupName).partitionsToOffsetAndMetadata();
System.out.println("Before: " + offsetsBeforeResetFuture.get().toString());
// Alter offsets
adminClient.alterConsumerGroupOffsets(consumerGroupName, toOffset).partitionResult(topicPartition).get();
// Check offsets after altering
KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> offsetsAfterResetFuture = adminClient.listConsumerGroupOffsets(consumerGroupName).partitionsToOffsetAndMetadata();
System.out.println("After: " + offsetsAfterResetFuture.get().toString());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
adminClient.close();
}
这将打印出来
Before: {test-0=OffsetAndMetadata{offset=1, leaderEpoch=null, metadata=''}}
After: {test-0=OffsetAndMetadata{offset=4, leaderEpoch=null, metadata=''}}
您可以扩展该示例以加载一个 csv 文件,其中包含有关消费者组、主题、分区和该分区的新偏移量的所有信息。