【问题标题】:Apache Kafka 0.9.0.0 Show all Topics with PartitionsApache Kafka 0.9.0.0 显示所有带有分区的主题
【发布时间】:2016-04-19 19:16:43
【问题描述】:

我目前正在评估 Apache Kafka,我有一个简单的消费者,它应该从特定主题分区读取消息。这是我的客户:

public static void main(String args[]) {

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "false");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

    TopicPartition partition0 = new TopicPartition("test_topic", Integer.parseInt(args[0]));

    ArrayList topicAssignment = new ArrayList();
    topicAssignment.add(partition0);
    consumer.assign(topicAssignment);

    //consumer.subscribe(Arrays.asList("test_topic"));
    int commitInterval = 200;
    List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            buffer.add(record);
            if (buffer.size() >= commitInterval) {
                process(buffer);
                consumer.commitSync();
                buffer.clear();
            }
        }
    }
}

static void process(List<ConsumerRecord<String, String>> buffers) {
   for (ConsumerRecord<String, String> buffer : buffers) {
       System.out.println(buffer);
   }
}

这是我用来启动 Apache Kafka 的命令:

bin/kafka-server-start.sh config/server.properties & bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic test_topic

正如您在此处看到的,我正在创建具有 2 个分区(p0 和 p1)的主题!

然后我使用以下命令启动我的消费者的两个实例:

对于消费者 1:

java -cp target/scala-2.11/kafka-consumer-0.1.0-SNAPAHOT.jar com.test.api.consumer.KafkaConsumer09Java 0

对于消费者 2:

java -cp target/scala-2.11/kafka-consumer-0.1.0-SNAPAHOT.jar com.test.api.consumer.KafkaConsumer09Java 1

其中 0 和 1 代表我希望消费者从中读取消息的实际分区。

但是发生的情况是,只有我的消费者 1 收到了所有消息。我的印象是,来自生产者的消息最终在分区上是一样的。

我使用以下命令查看我的主题 test_topic 有多少个分区:

Joes-MacBook-Pro:kafka_2.11-0.9.0.0 joe$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --broker-info --group test --topic test_topic --zookeeper localhost:2181
[2016-01-14 13:36:48,831] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Group           Topic                          Pid Offset          logSize         Lag             Owner
test            test_topic                     0   10000           10000           0               none
BROKER INFO
0 -> 172.22.4.34:9092

我跟Kafka说为test_topic创建2个partition,为什么只有一个partition?

这是我的制作人:

  def main(args: Array[String]) {
    //val conf = new SparkConf().setAppName("VPP metrics producer")
    //val sc = new SparkContext(conf)

    val props: Properties = new Properties()
      props.put("metadata.broker.list", "localhost:9092,localhost:9093")
      props.put("serializer.class", "kafka.serializer.StringEncoder")

    val config = new ProducerConfig(props)
    val producer = new Producer[String, String](config)

    1 to 10000 map {
      case i => 
        val jsonStr = getRandomTsDataPoint().toJson.toString
        println(s"sending message $i to kafka")
        producer.send(new KeyedMessage[String, String]("test_topic", jsonStr))
        println(s"sent message $i to kafka")
    }
  }

【问题讨论】:

  • 很有趣!我也想检查这个没有成功。当每个分区都有所有权但不知道如何分配所有权时,我已经看到了一些更详细的输出......

标签: apache-kafka


【解决方案1】:

您只是从分区 0 消费,但您还需要从分区 1 消费。如果您从 1 消费并提交,您将在 pid 列中看到 no 也没有 1。

但你还需要一个也写入 1 的生产者。

【讨论】:

  • 卡夫卡不应该通过循环方式将消息写入分区来自动执行此操作吗?我已经编辑了我的帖子以包括制作人。我应该如何以及如何使其写入分区?
  • 另外,我的消费者 2 正在从分区 1 读取数据。看看我上面帖子中的最后一个参数! “对于消费者 2:”
  • @sparkr 我错过了消费者 2 的那个。抱歉。我今天也在试验卡夫卡。我所经历的是,在填充 kafka 并同时让消费者 2 消费时,它只对我有用。我不明白分区何时切换。我正在使用 python api。生产者的默认值应该是它服务于两个分区......实际上在我的情况下它确实......有一个名为 random_start=True 的设置 //应该是默认值。
  • 解决办法是什么?我的意思是关于 Apache Kafka 的炒作似乎太多了,如果我必须努力让基本的东西正常工作,我不太相信!
  • 我想写入两个分区并使用消费者从这两个分区中读取,我希望消费者 1 读取 50% 的消息,消费者 2 读取剩余 50% 的消息。我应该在上面的代码中做什么来实现这一点?
【解决方案2】:

如果您使用 2 创建主题,我不确定为什么会有 1 个分区。我从来没有发生过,这是肯定的。

你可以试试这个: bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_topic 这应该会告诉您实际存在多少个分区。

然后,如果真的有 1 个分区,也许您可​​以通过创建一个新主题重新开始: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic test_topic_2

然后尝试: bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_topic_2 ...并报告调查结果。

【讨论】:

  • Joes-MacBook-Pro:kafka_2.11-0.9.0.0 joe$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_topic_2 Joes-MacBook-Pro:kafka_2 .11-0.9.0.0 乔$
  • 对不起,我错过了你的问题。这不适用于 Kafka 0.9,因为它不再将消费者偏移信息存储在 ZK 中,而是存储在一个特殊的 Kafka 主题中。我使用 Kafka Monitor 来监控我的消费者偏移量,所以也许你可以检查一下(适用于 0.9)。 Especially elegant if you're using Dockerdocker run -it -p 8080:8080 -e ZK=&lt;zookeeper_location&gt; jpodeszwik/kafka-offset-monitor:0.2.1。如果没有,去github repo看看如何手动运行。
猜你喜欢
  • 2019-02-25
  • 2016-04-23
  • 2015-09-12
  • 2014-06-28
  • 2016-04-18
  • 2016-08-25
  • 2018-08-05
  • 1970-01-01
  • 2015-07-03
相关资源
最近更新 更多