【问题标题】:Apache kafka HighLevel Consumer - UnderstandingApache kafka 高级消费者——理解
【发布时间】:2015-06-05 06:51:22
【问题描述】:

我一直在寻找 Apache kafka 以实现 HighLevel 消费者(我不想玩消息,我只需要将数据放入 MongoDB)v0.8.1.1

我查看了以下链接,其中显示了有关如何实现消费者的非常详细的信息。

Apache Kafka consumer wiki Another kafka consumer

但我仍然对所有线程关闭后消费者如何重新启动一无所知。例如。假设我有 4 个消费者线程正在运行,它们消耗了来自 kafka 代理的所有消息,所以一旦没有消息,所有消费者将什么都不做,在特定超时后它将被关闭,所以我不确定消费者如何再次重新启动当 kafka 代理中有新消息时。

有人可以分享一些代码或至少有一些关于此的指针。还有一种方法可以让我们在一些回调方法中包含我们的业务逻辑,当有消息时调用该方法而不是使用 while 循环。

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api


    【解决方案1】:

    我认为您可能误解了关机期间超时的使用。从理论上讲,无论这些事件之间的时间间隔如何,您都在消耗无限的事件流,因此您的消费者应该永远关闭,除非您正在更新代码或机器崩溃。在实际需要关闭消费者的情况下,10000 毫秒超时的作用是让 Kafka 消费者有足够的时间将其最后读取的偏移量写入 ZooKeeper,以便当消费者重新启动时,它将从它处理的最后一个偏移量恢复。这种消费者关闭通常发生在您的程序关闭时(可能捕获到 InterruptedException),而不仅仅是消费者。因此,当您的程序重新启动时,consuner 会重新启动。

    编辑

    我应该补充一点,Kafka 的 ConsumerIterator 遵循这种永无止境的消费模式的原因。迭代器的 next 方法将 always 阻塞,直到它可以读取下一条消息。因此,达到示例中超时的唯一方法是消费者线程因某种异常而关闭。

    编辑 2

    我还没有看到任何支持回调的 Kafka 消费者 API 我认为你现在唯一的选择是编写自己的回调实现,例如:

    public interface Callback {
      void call(MessageAndMetadata message);
    }
    
    Executor executor = Executors.newCachedThreadPool();
    final Callback<byte[], byte[]> callback = new MyCallback();
    while (it.hasNext()) {
      final MessageAndMetadata message = it.next();
      executor.submit(new Runnable() {
        public void run() {
          callback.call(message);
        }
      });
    }
    

    您可能对他们是 currently rewriting the consumer API for Kafka 0.9 感兴趣,但我认为我没有在重写中看到回调(尽管我可能是错的)。

    【讨论】:

    • 好的,这意味着消费者永远不会关闭,除非程序关闭或发生一些异常......对吗?那么另一个问题呢,即我可以有一个回调方法而不是 while 循环吗?
    • 感谢您的澄清,您不认为文档有点混乱吗?
    • 是的,看起来有点不清楚。我认为消费者组教程中提供的第一个示例之后的部分应该定义行为:“这里有趣的部分是 while (it.hasNext()) 部分。基本上这段代码从 Kafka 读取,直到你停止它。” i> 事实上,如果主题中没有消息并且您从不明确停止消费者线程,Kafka 将永远阻塞。
    猜你喜欢
    • 1970-01-01
    • 2016-09-01
    • 2017-07-10
    • 1970-01-01
    • 1970-01-01
    • 2015-12-30
    • 1970-01-01
    • 1970-01-01
    • 2016-10-16
    相关资源
    最近更新 更多