【发布时间】:2016-10-17 20:11:39
【问题描述】:
我已经实现了一个循环分区器,如下所示:
public class KafkaRoundRobinPartitioner implements Partitioner {
private static final Logger log = Logger.getLogger(KafkaRoundRobinPartitioner.class);
final AtomicInteger counter = new AtomicInteger(0);
public KafkaRoundRobinPartitioner() {}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int partitionsCount = partitions.size();
int partitionId = counter.incrementAndGet() % partitionsCount;
if (counter.get() > 65536) {
counter.set(partitionId);
}
return partitionId;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
现在我想测试每个分区是否有相同数量的消息。例如,如果我有 1 个具有 32 个分区的主题,并且我向该主题发送 32 条消息,我希望每个分区恰好有 1 条消息。
我想做如下的事情:
KafkaPartitions allPartitions = new KafkaTopic("topic_name");
for (KafkaPartition partition : allPartitions) {
int msgCount = partition.getMessagesCount();
// do asserts
}
据我所知,Kafka Java API 没有为我们提供这样的功能,但我可能在文档中丢失了一些东西。
有什么方法可以优雅的实现吗?
更新 我找到了一个基本的解决方案。由于我使用的是多消费者模型,因此我可以为每个消费者执行以下操作:
consumer.assignment().size();
之后我可以做:
consumer.poll(100);
并检查每个消费者是否有消息。在这种情况下,我不应该遇到一个消费者从其分区中为另一个消费者获取消息的情况,因为由于我拥有相同数量的消费者和分区,Kafka 应该以循环方式在消费者之间分配分区。
【问题讨论】:
标签: java apache-kafka integration-testing kafka-consumer-api