【发布时间】:2020-11-26 00:11:42
【问题描述】:
我正在使用带有 Spring Listener 的 Kafka。以下是一段代码。 过去,我们发布了超过 100k 条消息来测试主题,系统似乎运行良好。 但是几天前,我更改了消费者的 groupId。之后,这个新消费者尝试从头开始处理所有消息,这需要大量时间。但是在某个时间之后(10 秒)可能是代理启动消费者。 所以结果没有 kafka 注册来收听消息。
@KafkaListener(
topicPattern = "test",
groupId = "test",
id = "test",
containerFactory = "testKafkaListenerContainerFactory")
public void consume(@Payload String payload) throws IOException {
}
Kafka 消费者配置:
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put("security.protocol", "SSL");
然后我使用 cli 通过以下命令读取消息并观察到相同的行为。恰好 10 秒后,消费者停止读取来自 kafka 的消息。
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
如何增加 kafka 客户端的请求超时时间或其他更好的方法来解决此问题?
【问题讨论】:
标签: java spring apache-kafka