【问题标题】:Make kafka consumer run forever让 kafka 消费者永远运行
【发布时间】:2013-11-11 13:32:30
【问题描述】:

我写了一个high level Kafka consumer 作为 java 应用程序的一部分。

所以核心代码是这样的:

public void start() {
    ConsumerConnector consumerConnector = conf.getConsumerConnector();
    String topic = conf.getTopic();
    int numOfThereads = conf.getNumOfThreads();

    Map<String, Integer> topicCountMap = ImmutableMap.of(topic, numOfThereads);
    Map<String, List<KafkaStream<Message>>> topicMessageStreams = consumerConnector.createMessageStreams(topicCountMap);
    List<KafkaStream<Message>> streams = topicMessageStreams.get(topic);

    // create 4 threads to consume from each of the partitions
    executor = Executors.newFixedThreadPool(numOfThereads);

    // consume the messages in the threads
    for (final KafkaStream<Message> stream : streams) {
        executor.submit(new ConsumerThread(stream));
    }
}

为了测试我的消费者,我还创建了一个生产者,写信给 kafka,然后启动了我的消费者,它可以工作。 由于线程是在循环中执行的,我不确定我是否做对了。 我希望我的消费者永远运行并继续使用来自 kafka 的消息。

什么是让它永远运行的正确方法?

【问题讨论】:

  • 您如何迭代来自 Kakfa 的消息?根据他们的文档,他们说The interesting part here is the while (it.hasNext()) section. Basically this code reads from Kafka until you stop it.
  • 就我的测试而言,您似乎是对的,除非我停止,否则消费者会继续在 bg 中工作。随时将其发布为答案

标签: java multithreading apache-kafka


【解决方案1】:

@forhas 感谢您的确认

基本上从文档中他们迭代消费消息的方式如下

    ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
    while (it.hasNext())
        System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));

它还指出 The interesting part here is the while (it.hasNext()) section. Basically this code reads from Kafka until you stop it..

所以理想情况下,它应该一直运行,除非我们明确地杀死它,并且一旦产生了新消息,消费者端就可以使用它。

【讨论】:

    猜你喜欢
    • 2016-10-31
    • 2021-02-08
    • 2013-03-02
    • 2018-07-11
    • 2022-11-09
    • 2014-12-30
    • 2021-11-01
    • 1970-01-01
    • 2020-12-15
    相关资源
    最近更新 更多