【发布时间】:2020-10-12 00:10:55
【问题描述】:
我想为多个主题创建单个 kafka 消费者。消费者的方法构造函数允许我为订阅内的主题列表传输参数,如下所示:
private Consumer createConsumer() {
Properties props = getConsumerProps();
Consumer<String, byte[]> consumer = new KafkaConsumer<>(props);
ArrayList<String> topicMISL = new ArrayList<>();
for (String s:Connect2Redshift.kafkaTopics) {
topicMISL.add(systemID + "." + s);
}
consumer.subscribe(topicMISL);
return consumer;
}
private boolean consumeMessages( Duration duration, Consumer<String, byte[]> consumer) {
try { Long start = System.currentTimeMillis();
ConsumerRecords<String, byte[]> consumerRecords = consumer.poll(duration);
}
}
之后,我想每 3 秒将来自 kafka 的记录轮询到流中并处理它们,但我想知道这个消费者内部是什么 - 如何轮询来自不同主题的记录 - 首先是一个主题,然后是另一个主题,或者并行。会不会一直处理一个消息量大的topic,而另一个消息量少的topic会等待?
【问题讨论】:
标签: java apache-kafka kafka-consumer-api