【问题标题】:Spring boot Kafka doesn't work - consumer not receiving messagesSpring boot Kafka 不起作用 - 消费者未收到消息
【发布时间】:2019-08-22 17:56:37
【问题描述】:

我正在尝试运行一个简单的 Spring Boot Kafka 应用程序,但我无法使其工作。我遵循了各种教程,现在我正在实现this one,但是当我启动应用程序时会发生这种情况:

我可以在控制台写,但消费者没有收到任何消息。
这是我的 SpringApplication 类:

@SpringBootApplication(scanBasePackages = "com.springmiddleware")
@ComponentScan("com.springmiddleware")
@EnableAutoConfiguration
@EntityScan("com.springmiddleware")
public class SpringMiddlewareApplication implements CommandLineRunner{



    public static void main(String[] args) throws Exception {

        SpringApplication.run(SpringMiddlewareApplication.class, args);

    }

    @Autowired
    private Producer sender;

    @Override 
    public void run (String... strings) {
        sender.send("Hello world");
    }

}

application.yml:

spring:
  kafka:
    bootstrap-servers: localhost:8080

app:
  topic:
    foo: foo.t

logging:
  level:
    root: ERROR
    org.springframework.web: ERROR
    com.memorynotfound: DEBUG

消费者类、生产者类及其配置类与教程中写的相同。
在我的 server.properties 文件中,我有:

zookeeper.connect=localhost:8080

在 zookeeper.properties 中:

clientPort=8080

在 application.yml 中指定的相同端口。在启动应用程序之前,我运行

.\bin\windows\zookeeper-server-start.bat config\zookeeper.properties

.\bin\windows\kafka-server-start.bat config\server.properties

更新

这是ReceiverConfig 类:

@EnableKafka
@Configuration
public class ReceiverConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

这是 SenderConfig 类:

    @Configuration
public class SenderConfig {


    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

这是 Consumer 类中的方法 listen

@KafkaListener(topics = "${app.topic.foo}")
    public void listen(@Payload String message) {
        System.out.println("Received " + message);
    }

生产者类:

@Service
public class Producer {

     @Autowired
     private KafkaTemplate<String, String> kafkaTemplate;

     @Value("${app.topic.foo}")
        private String topic;

     public void send(String message){
            kafkaTemplate.send(topic, message);
        }
}

更新 2

[2019-04-01 17:23:52,492] 信息已建立会话 0x100435950880000,客户端协商超时 6000 /0:0:0:0:0:0:0:1:60079 (org.apache.zookeeper.服务器.ZooKeeperServer) [2019-04-01 17:23:52,539] INFO 处理 sessionid 时出现用户级 KeeperException:0x100435950880000 类型:创建 cxid:0x1 zxid:0xef txntype:-1 reqpath:n/a 错误路径:/消费者错误:KeeperErrorCode = /consumers 的 NodeExists (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,555] INFO 处理 sessionid 时出现用户级 KeeperException:0x100435950880000 类型:create cxid:0x2 zxid:0xf0 txntype:-1 reqpath:n/a 错误路径:/brokers/ids 错误: KeeperErrorCode = /brokers/ids 的 NodeExists (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,555] INFO 处理 sessionid 时出现用户级 KeeperException:0x100435950880000 类型:create cxid:0x3 zxid:0xf1 txntype:-1 reqpath:n/a 错误路径:/brokers/topics 错误: KeeperErrorCode = /brokers/topics 的 NodeExists (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,555] INFO 处理 sessionid 时出现用户级 KeeperException:0x100435950880000 类型:create cxid:0x4 zxid:0xf2 txntype:-1 reqpath:n/a 错误路径:/config/changes 错误: KeeperErrorCode = NodeExists for /config/changes (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,570] INFO 处理 sessionid 时出现用户级 KeeperException:0x100435950880000 类型:create cxid:0x5 zxid:0xf3 txntype:-1 reqpath:n/a 错误路径:/admin/delete_topics 错误: KeeperErrorCode = /admin/delete_topics 的 NodeExists (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,570] INFO 处理 sessionid 时出现用户级 KeeperException:0x100435950880000 类型:创建 cxid:0x6 zxid:0xf4 txntype:-1 reqpath:n/a 错误路径:/brokers/seqid 错误: KeeperErrorCode = /brokers/seqid 的 NodeExists (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,586] INFO 处理 sessionid 时出现用户级 KeeperException:0x100435950880000 类型:create cxid:0x7 zxid:0xf5 txntype:-1 reqpath:n/a 错误路径:/isr_change_notification 错误:KeeperErrorCode = /isr_change_notification 的节点存在(org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,586] INFO 处理 sessionid 时出现用户级 KeeperException:0x100435950880000 类型:创建 cxid:0x8 zxid:0xf6 txntype:-1 reqpath:n/a 错误路径:/latest_producer_id_block 错误:KeeperErrorCode = /latest_producer_id_block 的节点存在(org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,586] INFO 处理 sessionid 时出现用户级 KeeperException:0x100435950880000 类型:create cxid:0x9 zxid:0xf7 txntype:-1 reqpath:n/a 错误路径:/log_dir_event_notification 错误:KeeperErrorCode = /log_dir_event_notification 的节点存在(org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,602] INFO 处理 sessionid 时出现用户级 KeeperException:0x100435950880000 类型:create cxid:0xa zxid:0xf8 txntype:-1 reqpath:n/a 错误路径:/config/topics 错误: KeeperErrorCode = /config/topics 的 NodeExists (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,602] INFO 处理 sessionid 时出现用户级 KeeperException:0x100435950880000 类型:创建 cxid:0xb zxid:0xf9 txntype:-1 reqpath:n/a 错误路径:/config/clients 错误: KeeperErrorCode = /config/clients 的 NodeExists (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,617] INFO 处理 sessionid 时出现用户级 KeeperException:0x100435950880000 类型:create cxid:0xc zxid:0xfa txntype:-1 reqpath:n/a 错误路径:/config/users 错误: KeeperErrorCode = /config/users 的 NodeExists (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,617] INFO 处理 sessionid 时出现用户级 KeeperException:0x100435950880000 类型:create cxid:0xd zxid:0xfb txntype:-1 reqpath:n/a 错误路径:/config/brokers 错误: KeeperErrorCode = /config/brokers 的 NodeExists (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:53,564] INFO 处理 sessionid 时出现用户级 KeeperException:0x100435950880000 类型:multi cxid:0x3a zxid:0xff txntype:-1 reqpath:n/a 中止剩余的多操作。错误路径:/admin/preferred_replica_election 错误:KeeperErrorCode = NoNode for /admin/preferred_replica_election (org.apache.zookeeper.server.PrepRequestProcessor)

【问题讨论】:

  • 使用监听器方法显示消费者配置和生产者配置类
  • 课程更新
  • @Deadpool 匹配该行是正确的。 KafkaTemplate 在Producer 类的方法中使用。
  • 你能添加Producer类吗?
  • @MadhuBhat 添加了有问题的生产者类

标签: java spring-boot apache-kafka


【解决方案1】:

在您的 application.yml 中,您指定了 zookeeper 端口 而不是 kafka 代理端口

spring:
  kafka:
    bootstrap-servers: localhost:8080

在上面,你应该定义kafka broker的端口,即server.properties文件的port=的值。

Spring boot 应用默认运行在 8080 端口上,因此请不要对 Zookeeper 端口使用相同的端口,除非您更改了 Spring boot 应用的默认端口。

所以在server.properties中,有port=9092zookeeper.connect=localhost:2181,在application.yml中,有如下:

spring:
  kafka:
    bootstrap-servers: localhost:9092

然后在 zookeeper.properties 中,有clientPort=2181。然后依次重启 Zookeeper、Kafka 服务器和 Spring boot 应用。

更新:

较新版本的 Kafka 在 server.properties 文件中使用 listeners=PLAINTEXT://localhost:9092 而不是 port=9092。所以尝试替换它。

【讨论】:

  • 您好,在 server.properties 文件中我没有 port=,我必须手动添加吗?应该在哪个位置?
  • @TodorokiM server.properties 应该有一个 port 键来定义 kafka 代理应该在哪个端口上运行。查看此示例属性文件github.com/bkimminich/apache-kafka-book-examples/blob/master/…
  • port=9092添加到server.properties,将相同的端口添加到application.ymlspring: kafka: bootstrap-servers:,然后重启zookeeper、kafka server和你的spring boot app。
  • @TodorokiM 您可以尝试将server.properties 中的port=9092 替换为listeners=PLAINTEXT://localhost:9092 并重新启动zookeeper、kafka 和spring boot 应用程序吗?
  • @TodorokiM Kafka 旧版本使用port=,但新版本使用listeners=,这就是区别。
猜你喜欢
  • 2021-06-06
  • 1970-01-01
  • 2016-05-15
  • 2018-09-12
  • 1970-01-01
  • 2023-04-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多