【问题标题】:Error handling - Consumer - apache kafka and spring错误处理 - 消费者 - apache kafka 和 spring
【发布时间】:2019-11-25 18:21:22
【问题描述】:

我正在学习使用 kafka,我有两个服务一个生产者和一个消费者。

生产者产生需要处理的消息(对服务和数据库的查询)。这些消息由消费者接收,它负责处理它们并将结果保存在数据库中

制片人

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
...
kafkaTemplate.send(topic, message);

消费者

@KafkaListener(topics = "....")
public void listen(@Payload String message) {
....
}

我希望消费者正确处理所有消息。 在这种情况下,我不知道如何处理消费者方面的错误。例如,数据库可能被暂时禁用,无法处理某些消息。

遇到这些情况该怎么办?

我知道责任属于消费者。 我可以重试,但是如果数据库关闭,则连续重试几次似乎不是一个好主意。如果我继续消费消息,索引会前进,我会丢失我无法处理的事件。

【问题讨论】:

    标签: spring error-handling apache-kafka


    【解决方案1】:

    您可以通过提交读取记录的偏移量来控制 kafka 消费者。除非提交偏移量,否则 Kafka 将继续返回相同的记录。您可以将偏移提交设置为手动,并根据您的业务逻辑是否成功决定是否提交。请参阅下面的示例

    Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "false");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     final int minBatchSize = 200;
     List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records) {
             buffer.add(record);
         }
         if (buffer.size() >= minBatchSize) {
             insertIntoDb(buffer);
             consumer.commitSync();
             buffer.clear();
         }
     }
    

    Consumer.commitsync() 提交偏移量。

    另请参阅 kakfa 消费者文档以了解消费者偏移量 here

    【讨论】:

    • 即使消费者没有提交,它也会在下一条消息上读取。提交部分发生在消费者死亡并再次出现时。如果消费者永远不会死,它会继续阅读下一条消息,无论你是否提交
    【解决方案2】:

    这个链接很有帮助https://dzone.com/articles/spring-for-apache-kafka-deep-dive-part-1-error-han

    Spring 提供了 DeadLetterPublishingRecoverer 类,该类执行正确的错误处理。

    【讨论】:

      猜你喜欢
      • 2023-01-10
      • 1970-01-01
      • 2021-05-06
      • 1970-01-01
      • 1970-01-01
      • 2020-12-14
      • 2018-12-17
      • 2015-06-05
      • 2017-08-13
      相关资源
      最近更新 更多