【问题标题】:Kafka Listener Method is not invoked. Consumer not consuming.不调用 Kafka 侦听器方法。消费者不消费。
【发布时间】:2017-10-04 19:54:11
【问题描述】:

前面提到的应该从单个主题消费的 kafka 消费者。我无法使用 Spring Boot,因为我将 kafka 消费者 API 与 Spring Core Web 应用程序集成在一起。

spring xml配置如下

<bean id="kafkaConsumerProperties" class="com.azuga.kafka.listeners.KafkaConsumerProperties">
    <constructor-arg type="java.lang.String" value="127.0.0.1:9092" />
    <constructor-arg type="java.lang.String" value="tdm-group" />
    <constructor-arg type="java.lang.String" value="dbStreamer.azuga.tripDriverMapping" />
</bean>
<bean id="kafkaListenerConfig" class="com.azuga.kafka.listeners.KafkaListenerConfig">
    <property name="kafkaConsumerProperties" ref="kafkaConsumerProperties" />
</bean>
<bean id="kafkaContainerFactory" class="com.azuga.kafka.listeners.KafkaListenerContainerFactory"
    factory-method="kafkaContainerFactory">
</bean>

这是创建 ListenerContainerFactory 的类

@EnableKafka
public class KafkaListenerContainerFactory {

public static ConcurrentKafkaListenerContainerFactory<String, String> kafkaContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConcurrency(1);
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;
}

@SuppressWarnings("unchecked")
public static ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(KafkaListenerConfig.consumerProps(),
            KafkaListenerConfig.stringKeyDeserializer(), KafkaListenerConfig.stringKeyDeserializer());
}

}

这是我用 @KafkaListener 注释的监听器类

package com.azuga.kafka.listeners;

import org.springframework.kafka.annotation.KafkaListener;
public class Listener {

@KafkaListener(topics = "dbStreamer.azuga.tripDriverMapping")
public void onMessage(String message) {
    System.out.println(message.toString());
}
}

这是 KafkaListenerConfig 类,它接受引导服务器、主题名称等。

@EnableKafka
public class KafkaListenerConfig {

private static KafkaConsumerProperties kafkaConsumerProperties;

public void setKafkaConsumerProperties(KafkaConsumerProperties kafkaConsumerProperties) {
    this.kafkaConsumerProperties = kafkaConsumerProperties;
}

public static Map<String, Object> consumerProps() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConsumerProperties.getBootstrap());
    props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerProperties.getGroup());
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
    return props;
}

public static Deserializer stringKeyDeserializer() {
    return new StringDeserializer();
}

}

【问题讨论】:

    标签: kafka-consumer-api spring-kafka


    【解决方案1】:

    您的应用程序配置有点不寻常。

    但是我假设你错过了 @EnableKafka 是关于 @Configuration 类的事实。因此,根据 Spring Framework 文档,您必须使用 AnnotationConfigWebApplicationContext 类:

    * {@link org.springframework.web.context.WebApplicationContext WebApplicationContext}
     * implementation which accepts annotated classes as input - in particular
     * {@link org.springframework.context.annotation.Configuration @Configuration}-annotated
     * classes, but also plain {@link org.springframework.stereotype.Component @Component}
     * classes and JSR-330 compliant classes using {@code javax.inject} annotations. Allows
     * for registering classes one by one (specifying class names as config location) as well
     * as for classpath scanning (specifying base packages as config location).
    

    不幸的是,这不适用于纯 XML 配置。

    Spring Kafka 没有为 XML 定义提供任何钩子。

    【讨论】:

    • 感谢您的快速回复。我不知道它不适用于 xml 配置。但是我确实使用注释实现了它,它就像一个魅力
    猜你喜欢
    • 2019-08-25
    • 2018-08-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-11-11
    • 1970-01-01
    • 2018-10-31
    • 1970-01-01
    相关资源
    最近更新 更多