【问题标题】:Kafka Consumer with JAVA使用 JAVA 的 Kafka 消费者
【发布时间】:2017-01-31 10:01:52
【问题描述】:

我有一个 Kafka-Broker,其中包含多个主题,每个主题都有一个分区。

我有一个消费者可以很好地消费来自主题的消息

我的问题是我需要通过增加分区数量来提高消息队列的吞吐量,比如我在一个主题上有四个分区,有没有办法可以写四个消费者,每个消费者都指向单个分区话题???

import java.util.*;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaConsumer {
   private ConsumerConnector consumerConnector = null;
   private final String topic = "mytopic";

   public void initialize() {
         Properties props = new Properties();
         props.put("zookeeper.connect", "localhost:2181");
         props.put("group.id", "testgroup");
         props.put("zookeeper.session.timeout.ms", "400");
         props.put("zookeeper.sync.time.ms", "300");
         props.put("auto.commit.interval.ms", "1000");
         ConsumerConfig conConfig = new ConsumerConfig(props);
         consumerConnector = Consumer.createJavaConsumerConnector(conConfig);
   }

   public void consume() {
         //Key = topic name, Value = No. of threads for topic
         Map<String, Integer> topicCount = new HashMap<String, Integer>();       
         topicCount.put(topic, new Integer(1));

         //ConsumerConnector creates the message stream for each topic
         Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams =
               consumerConnector.createMessageStreams(topicCount);         

         // Get Kafka stream for topic 'mytopic'
         List<KafkaStream<byte[], byte[]>> kStreamList =
                                              consumerStreams.get(topic);
         // Iterate stream using ConsumerIterator
         for (final KafkaStream<byte[], byte[]> kStreams : kStreamList) {
                ConsumerIterator<byte[], byte[]> consumerIte = kStreams.iterator();

                while (consumerIte.hasNext())
                       System.out.println("Message consumed from topic
                                     [" + topic + "] : "       +
                                       new String(consumerIte.next().message()));              
         }
         //Shutdown the consumer connector
         if (consumerConnector != null)   consumerConnector.shutdown();          
   }

   public static void main(String[] args) throws InterruptedException {
         KafkaConsumer kafkaConsumer = new KafkaConsumer();
         // Configure Kafka consumer
         kafkaConsumer.initialize();
         // Start consumption
         kafkaConsumer.consume();
   }

}

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api


    【解决方案1】:

    基本上,您需要做的就是启动几个都在同一个消费者组中的消费者。如果您使用的是 kafka 0.9 或更高版本的新消费者,或者您使用的是高级消费者,kafka 将负责划分分区,确保每个分区由一个消费者读取。如果你的分区比消费者多,那么一些消费者会收到来自多个分区的消息,但不会有多个分区被同一消费者组中的多个消费者读取,以确保消息不重复。所以你永远不会想要比分区更多的消费者,因为有些消费者会空闲。您还可以使用简单的消费者https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

    微调哪个消费者读取每个分区

    您似乎正在使用 Kafka 0.8 或更早版本的旧消费者。您可能需要考虑切换到新的消费者。 http://kafka.apache.org/documentation.html#intro_consumers

    这是另一篇不错的文章,其中包含使用新消费者编写消费者的详细示例:http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-12-12
      • 1970-01-01
      • 2020-03-29
      • 2020-09-19
      • 2017-01-04
      • 2018-06-04
      相关资源
      最近更新 更多