【问题标题】:Spring Boot Kafka Listener vs ConsumerSpring Boot Kafka 侦听器与消费者
【发布时间】:2019-08-25 02:42:39
【问题描述】:

有什么区别? KafkaConsumer 和 KafkaListener 可以互换使用吗?

【问题讨论】:

  • 它们在 Java 代码中不可互换。 KafkaConsumer 是 Kafka 客户端库中的一个类,它为应用程序提供接收消息的 API。 KafkaListener 是应用于方法的注释,因此 Spring Kafka 将调用它来处理消息。
  • 那么各自的作用是什么?

标签: apache-kafka kafka-consumer-api spring-kafka


【解决方案1】:

@KafkaListenerConcurrentMessageListenerContainer 的高级 API,它围绕 KafkaConsumer 生成多个内部侦听器。

不同之处在于,KafkaConsumer API 是可轮询,当您在需要时调用它的poll()。侦听器抽象将围绕poll() 进行无限循环,并且只要记录从poll() 出现,它就会为记录生成消息。我们有一个任务执行器,它运行如下逻辑:

        while (isRunning()) {
            try {
                pollAndInvoke();
            }
            catch (@SuppressWarnings(UNUSED) WakeupException e) {
                // Ignore, we're stopping
            }
            catch (NoOffsetForPartitionException nofpe) {
                this.fatalError = true;
                ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);
                break;
            }
            catch (Exception e) {
                handleConsumerException(e);
            }
            catch (Error e) { // NOSONAR - rethrown
                Runnable runnable = KafkaMessageListenerContainer.this.emergencyStop;
                if (runnable != null) {
                    runnable.run();
                }
                this.logger.error("Stopping container due to an Error", e);
                wrapUp();
                throw e;
            }
        }

KafkaConsumer.poll()pollAndInvoke(); 中被调用。

【讨论】:

  • 当我将属性spring.kafka.listener.concurrency设置为4时,会实例化4个消费者,还是4个监听器?
  • 所有消费者的监听器都是一样的。它是无状态的,所以不要产生更多的听众是完全可以的。并发性完全取决于您将从主题配置中为侦听器设置的分区数量。所以,如果你最终只有一个分区,那么并发配置就无所谓了。
猜你喜欢
  • 2020-11-06
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-11-08
  • 1970-01-01
  • 2016-12-09
  • 1970-01-01
  • 2018-02-18
相关资源
最近更新 更多