【发布时间】:2019-08-22 17:56:37
【问题描述】:
我正在尝试运行一个简单的 Spring Boot Kafka 应用程序,但我无法使其工作。我遵循了各种教程,现在我正在实现this one,但是当我启动应用程序时会发生这种情况:
我可以在控制台写,但消费者没有收到任何消息。
这是我的 SpringApplication 类:
@SpringBootApplication(scanBasePackages = "com.springmiddleware")
@ComponentScan("com.springmiddleware")
@EnableAutoConfiguration
@EntityScan("com.springmiddleware")
public class SpringMiddlewareApplication implements CommandLineRunner{
public static void main(String[] args) throws Exception {
SpringApplication.run(SpringMiddlewareApplication.class, args);
}
@Autowired
private Producer sender;
@Override
public void run (String... strings) {
sender.send("Hello world");
}
}
application.yml:
spring:
kafka:
bootstrap-servers: localhost:8080
app:
topic:
foo: foo.t
logging:
level:
root: ERROR
org.springframework.web: ERROR
com.memorynotfound: DEBUG
消费者类、生产者类及其配置类与教程中写的相同。
在我的 server.properties 文件中,我有:
zookeeper.connect=localhost:8080
在 zookeeper.properties 中:
clientPort=8080
在 application.yml 中指定的相同端口。在启动应用程序之前,我运行
.\bin\windows\zookeeper-server-start.bat config\zookeeper.properties
和
.\bin\windows\kafka-server-start.bat config\server.properties
更新
这是ReceiverConfig 类:
@EnableKafka
@Configuration
public class ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
这是 SenderConfig 类:
@Configuration
public class SenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
这是 Consumer 类中的方法 listen
@KafkaListener(topics = "${app.topic.foo}")
public void listen(@Payload String message) {
System.out.println("Received " + message);
}
生产者类:
@Service
public class Producer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${app.topic.foo}")
private String topic;
public void send(String message){
kafkaTemplate.send(topic, message);
}
}
更新 2
[2019-04-01 17:23:52,492] 信息已建立会话 0x100435950880000,客户端协商超时 6000 /0:0:0:0:0:0:0:1:60079 (org.apache.zookeeper.服务器.ZooKeeperServer) [2019-04-01 17:23:52,539] INFO 处理 sessionid 时出现用户级 KeeperException:0x100435950880000 类型:创建 cxid:0x1 zxid:0xef txntype:-1 reqpath:n/a 错误路径:/消费者错误:KeeperErrorCode = /consumers 的 NodeExists (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,555] INFO 处理 sessionid 时出现用户级 KeeperException:0x100435950880000 类型:create cxid:0x2 zxid:0xf0 txntype:-1 reqpath:n/a 错误路径:/brokers/ids 错误: KeeperErrorCode = /brokers/ids 的 NodeExists (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,555] INFO 处理 sessionid 时出现用户级 KeeperException:0x100435950880000 类型:create cxid:0x3 zxid:0xf1 txntype:-1 reqpath:n/a 错误路径:/brokers/topics 错误: KeeperErrorCode = /brokers/topics 的 NodeExists (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,555] INFO 处理 sessionid 时出现用户级 KeeperException:0x100435950880000 类型:create cxid:0x4 zxid:0xf2 txntype:-1 reqpath:n/a 错误路径:/config/changes 错误: KeeperErrorCode = NodeExists for /config/changes (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,570] INFO 处理 sessionid 时出现用户级 KeeperException:0x100435950880000 类型:create cxid:0x5 zxid:0xf3 txntype:-1 reqpath:n/a 错误路径:/admin/delete_topics 错误: KeeperErrorCode = /admin/delete_topics 的 NodeExists (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,570] INFO 处理 sessionid 时出现用户级 KeeperException:0x100435950880000 类型:创建 cxid:0x6 zxid:0xf4 txntype:-1 reqpath:n/a 错误路径:/brokers/seqid 错误: KeeperErrorCode = /brokers/seqid 的 NodeExists (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,586] INFO 处理 sessionid 时出现用户级 KeeperException:0x100435950880000 类型:create cxid:0x7 zxid:0xf5 txntype:-1 reqpath:n/a 错误路径:/isr_change_notification 错误:KeeperErrorCode = /isr_change_notification 的节点存在(org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,586] INFO 处理 sessionid 时出现用户级 KeeperException:0x100435950880000 类型:创建 cxid:0x8 zxid:0xf6 txntype:-1 reqpath:n/a 错误路径:/latest_producer_id_block 错误:KeeperErrorCode = /latest_producer_id_block 的节点存在(org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,586] INFO 处理 sessionid 时出现用户级 KeeperException:0x100435950880000 类型:create cxid:0x9 zxid:0xf7 txntype:-1 reqpath:n/a 错误路径:/log_dir_event_notification 错误:KeeperErrorCode = /log_dir_event_notification 的节点存在(org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,602] INFO 处理 sessionid 时出现用户级 KeeperException:0x100435950880000 类型:create cxid:0xa zxid:0xf8 txntype:-1 reqpath:n/a 错误路径:/config/topics 错误: KeeperErrorCode = /config/topics 的 NodeExists (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,602] INFO 处理 sessionid 时出现用户级 KeeperException:0x100435950880000 类型:创建 cxid:0xb zxid:0xf9 txntype:-1 reqpath:n/a 错误路径:/config/clients 错误: KeeperErrorCode = /config/clients 的 NodeExists (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,617] INFO 处理 sessionid 时出现用户级 KeeperException:0x100435950880000 类型:create cxid:0xc zxid:0xfa txntype:-1 reqpath:n/a 错误路径:/config/users 错误: KeeperErrorCode = /config/users 的 NodeExists (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:52,617] INFO 处理 sessionid 时出现用户级 KeeperException:0x100435950880000 类型:create cxid:0xd zxid:0xfb txntype:-1 reqpath:n/a 错误路径:/config/brokers 错误: KeeperErrorCode = /config/brokers 的 NodeExists (org.apache.zookeeper.server.PrepRequestProcessor) [2019-04-01 17:23:53,564] INFO 处理 sessionid 时出现用户级 KeeperException:0x100435950880000 类型:multi cxid:0x3a zxid:0xff txntype:-1 reqpath:n/a 中止剩余的多操作。错误路径:/admin/preferred_replica_election 错误:KeeperErrorCode = NoNode for /admin/preferred_replica_election (org.apache.zookeeper.server.PrepRequestProcessor)
【问题讨论】:
-
使用监听器方法显示消费者配置和生产者配置类
-
课程更新
-
@Deadpool 匹配该行是正确的。 KafkaTemplate 在
Producer类的方法中使用。 -
你能添加
Producer类吗? -
@MadhuBhat 添加了有问题的生产者类
标签: java spring-boot apache-kafka