【问题标题】:Spring Boot Kafka Consumer not consuming, Kafka Listener not triggeringSpring Boot Kafka Consumer 未消费,Kafka Listener 未触发
【发布时间】:2020-04-24 06:48:03
【问题描述】:

我正在尝试构建一个简单的 Spring Boot Kafka Consumer 来消费来自 kafka 主题的消息,但是由于 KafkaListener 方法没有被触发,因此没有消息被消费。

我在其他答案中看到,以确保 AUTO_OFFSET_RESET_CONFIG 设置为“最早”并且 GROUP_ID_CONFIG 是唯一的,我这样做了,但是 KafkaListenerMethod 仍然没有触发。应用程序只是启动并且不执行任何操作:

Application Started

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.1.4.RELEASE)

这是另一个 gradle 项目的子项目,该子项目的 build.gradle 如下(代码中已正确提供了 MAIN_CLASS_PATH):

apply plugin: 'application'

mainClassName = <MAIN_CLASS_PATH>

dependencies {
    compile "org.springframework.kafka:spring-kafka:${SpringKafkaVersion}"
    compile "org.springframework.boot:spring-boot-starter:${SpringBootVersion}"
    compile group: 'org.springframework', name: 'spring-tx', version: '5.2.2.RELEASE'
}

Java 类:

Start.java:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Start {
    public static void main(String[] args) {
        try {
            System.out.println("Application Started");
            SpringApplication.run(Start.class, args);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

KafkaConsumerConfig.java:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.UUID;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        HashMap<String, Object> props = new HashMap<>();
        props.put(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                <KAFKA_SERVER_ADDRESS>);
        props.put(
                ConsumerConfig.GROUP_ID_CONFIG,
                UUID.randomUUID().toString());
        props.put(
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String>
    kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

KafkaConsumer.java

@Service
public class KafkaConsumer {

    @KafkaListener(topics = <TOPIC_NAME>)
    public void consume(@Payload ConsumerRecord<String, String> message) {
        System.out.println("Consumed message: " + message);
    }
}

我的代码中已正确提供了 KAFKA_SERVER_ADDRESS 和 TOPIC_NAME。我还检查了主题实际上已经包含消息。

关于为什么这不消耗来自 kafka 主题的任何消息的任何想法?

【问题讨论】:

  • 看起来不错,虽然你真的不需要定义消费者或容器工厂 - 启动会从应用程序属性(或 yaml)自动配置它们。启用 DEBUG 日志记录是否有帮助?
  • 嗨,Gary,根据您的建议,我添加了 DEBUG 级别的日志记录。
  • 有帮助吗? KafkaConsumer 类是否与Start 在同一个包(或子包)中?
  • 不要在 cmets 中放这样的东西;它渲染得不好;而是编辑问题,并在评论中说明您已这样做(我们不会收到编辑通知)。您应该会看到类似ktest24: partitions assigned: [ktest24-0, ktest24-9, ... 的信息日志。使用 DEBUG 日志记录,如果没有可获取的内容,您应该每 5 秒(默认情况下)看到 Received: 0 recordsCommit list: {}。如果您没有看到这些,则说明有些东西卡住了,您必须进行线程转储以查看使用者线程在做什么。

标签: java spring-boot apache-kafka spring-kafka


【解决方案1】:

Spring boot 和 spring kafka 具有自动配置功能。对于简单的消费者,删除您的配置类,并将此属性添加到您的 application.yml(或 .properties):

 spring.application.name: your-app-name
 spring.kafka.consumer.group-id: your-group-id
 spring.kafka.bootstrap-servers: server1:port1,server2:port2,server3:port3

【讨论】:

    【解决方案2】:

    问题是 kafka 连接问题。从 kafka 服务器消费的机器不允许从 kafka 服务器消费。将 kafka 服务器节点中的消费机器列入白名单解决了该问题。

    如果kafka server url可以解析但由于权限不足而无法连接,kafka消费者不会记录。这不是特定于 spring kafka,但即使是普通的 kafka 客户端消费者也没有记录这个。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-02-15
      • 1970-01-01
      • 2020-11-06
      • 2022-06-15
      • 1970-01-01
      • 2019-08-22
      • 2021-06-06
      • 2020-07-10
      相关资源
      最近更新 更多