【问题标题】:Kafka Consumer - Java ImplementationKafka 消费者 - Java 实现
【发布时间】:2016-02-28 20:10:35
【问题描述】:

我遇到了这个link,用于在 Java 中实现 KafkaConsumer。下面列出的代码读取流并处理消息。一旦启动,此代码会继续收听传入的消息吗?如果是这样,它如何继续运行并继续消费消息?

public void run(int a_numThreads) {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(a_numThreads));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    // now launch all the threads
    //
    executor = Executors.newFixedThreadPool(a_numThreads);

    // now create an object to consume the messages
    //
    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
        executor.submit(new ConsumerTest(stream, threadNumber));
        threadNumber++;
    }
}

【问题讨论】:

  • 我想知道他们为什么决定称它为 Kafka。
  • 卡夫卡起源于捷克语。卡夫卡的意思是“像一只鸟”。也许是一只“鸽子”,将消息从源传递到目的地;)
  • 多么有趣!我会说一点(很少)捷克语,但我不知道,而且我的字典里没有。

标签: java spring apache-kafka


【解决方案1】:

代码一旦启动就会一直监听传入的消息,因为它使用ThreadPool(线程池中的线程数=a_numThreads)并且线程池中的每个线程都执行一个消费者(ConsumerTest) .

如果您仔细查看ConsumerTest 类,您会发现有一个无限循环。 it.hasNext() 处于阻塞状态(请参阅 ConsumerIterator),因此消费者将始终等待下一条消息:

public void run() {
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
        while (it.hasNext())
            System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
        System.out.println("Shutting down Thread: " + m_threadNumber);
}

话虽如此,如果在指定的时间间隔后没有消息可供消费,则可以使用属性consumer.timeout.ms Throw 设置超时异常,但默认值为 - 1(无超时),因此默认情况下,消费者将一直持续消费消息,直到消费者和执行者关闭(参见示例中的shutdown 方法)。

【讨论】:

  • 谢谢琼!迭代器是 Kafka 提供的一种实现,会导致程序阻塞并等待下一条消息吗?
  • 没错,迭代器是在 Kafka 中实现的,并由 java BlockingQueue 支持。 “一个迭代器,它阻塞直到可以从提供的队列中读取一个值。迭代器接受一个可以添加到队列中以触发关闭的 shutdownCommand 对象”(github.com/kafka-dev/kafka/blob/master/core/src/main/scala/…)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-03-06
  • 1970-01-01
  • 2020-08-20
  • 2015-12-12
  • 2017-01-31
相关资源
最近更新 更多