【问题标题】:How to increase request time for kafka consumer如何增加kafka消费者的请求时间
【发布时间】:2020-11-26 00:11:42
【问题描述】:

我正在使用带有 Spring Listener 的 Kafka。以下是一段代码。 过去,我们发布了超过 100k 条消息来测试主题,系统似乎运行良好。 但是几天前,我更改了消费者的 groupId。之后,这个新消费者尝试从头开始处理所有消息,这需要大量时间。但是在某个时间之后(10 秒)可能是代理启动消费者。 所以结果没有 kafka 注册来收听消息。

 @KafkaListener(
            topicPattern = "test",
            groupId = "test",
            id = "test",
            containerFactory = "testKafkaListenerContainerFactory")
    public void consume(@Payload String payload) throws IOException {
    }

Kafka 消费者配置:

Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.put("security.protocol", "SSL");

然后我使用 cli 通过以下命令读取消息并观察到相同的行为。恰好 10 秒后,消费者停止读取来自 kafka 的消息。

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

如何增加 kafka 客户端的请求超时时间或其他更好的方法来解决此问题?

【问题讨论】:

    标签: java spring apache-kafka


    【解决方案1】:
    In the past we have published more than 1 lac message to test topic and system seems to be working fine.
    

    您的 Kafka 是否有所需的数据? Kafka 确实不会永远存储消息。消息的持续时间由代理中设置的保留期控制。如果您的数据超过保留期限,则会丢失。

    【讨论】:

    • 我的意思是我们定制了我们的保留政策来存储数据,而过去的平均值只是几天前。但这里的重点不是数据可用性。它与消费者的处理时间有关。
    • 假设您在控制台消费者中使用了从头开始标志,它将打印所有消息。它不会自己超时。这就是为什么我的第一个猜测是它只是简单地读取了所有数据(这比您的预期要少得多)
    猜你喜欢
    • 1970-01-01
    • 2019-11-14
    • 1970-01-01
    • 2020-10-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多