【问题标题】:creating one kafka consumer for several topics为多个主题创建一个 kafka 消费者
【发布时间】:2020-10-12 00:10:55
【问题描述】:

我想为多个主题创建单个 kafka 消费者。消费者的方法构造函数允许我为订阅内的主题列表传输参数,如下所示:

private Consumer createConsumer() {
    Properties props = getConsumerProps();
    Consumer<String, byte[]> consumer = new KafkaConsumer<>(props);
    ArrayList<String> topicMISL = new ArrayList<>();
    for (String s:Connect2Redshift.kafkaTopics) {
        topicMISL.add(systemID + "." + s);
    }
    consumer.subscribe(topicMISL);
    return consumer;
}


private boolean consumeMessages( Duration duration, Consumer<String, byte[]> consumer) {
        try {  Long start = System.currentTimeMillis();
            ConsumerRecords<String, byte[]> consumerRecords = consumer.poll(duration);
            }
   }

之后,我想每 3 秒将来自 kafka 的记录轮询到流中并处理它们,但我想知道这个消费者内部是什么 - 如何轮询来自不同主题的记录 - 首先是一个主题,然后是另一个主题,或者并行。会不会一直处理一个消息量大的topic,而另一个消息量少的topic会等待?

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api


    【解决方案1】:

    一般情况下,这取决于您的主题设置。 Kafka 通过为每个主题使用多个分区进行扩展。

    • 如果您在 1 个主题上有 3 个分区,kafka 可以并行读取它们
    • 多个主题也是如此,阅读可以并行进行

    如果您有一个分区接收的消息比其他分区多得多,您可能会遇到此特定分区的消费者滞后的情况。调整批量大小和消费者设置可能会对他们有所帮助,也可以压缩消息。 理想情况下,确保平均分配负载可以避免这种情况。

    看看这篇博客文章,它让我对内部有了一个很好的了解:https://www.confluent.io/blog/configure-kafka-to-minimize-latency/

    【讨论】:

      【解决方案2】:
      ConsumerRecords<String, String> records = consumer.poll(long value);
          for (TopicPartition partition : records.partitions()) {
              List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
              for (ConsumerRecord<String, String> record : partitionRecords) {
                  
              }
              
          }
      

      还需要通过使用 consumer.commitSync 查找偏移量和提交来提交偏移量

      【讨论】:

        猜你喜欢
        • 2020-06-03
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2017-01-26
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多