【问题标题】:Spring Kafka Consumer with database带有数据库的 Spring Kafka Consumer
【发布时间】:2021-08-30 19:00:45
【问题描述】:

如何在事务中执行以下操作。我的要求是,如果数据库调用失败,则不应将消息偏移量提交给 Kafka。Kafka 消费者配置在这里https://pastebin.com/kq5S9Jrx

@KafkaListener(topics = "${general.topic.name}" , groupId = "${general.topic.group.id}" )
    public void consume(String message,@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment ack) 
    {
        logger.debug(String.format("Message recieved -> %s", message));
        
        // start transaction
        dbservice.validateMessage(message);
        
        dbservice.saveInDB(message);
        ack.acknowledge();
        // end transaction
}

【问题讨论】:

  • 一般来说,建议使用 Kafka Connect 来处理数据库写入,而不是在您自己的代码中实现。
  • 无论如何,try { dbservice.saveInDB(message); ack } catch () 似乎不言自明
  • 不,我们不使用 Kafka Connect。是的,我曾计划做同样的事情。检查是否有更好的解决方案,比如 chainedkafkatransactionmanager 。
  • AFAIK,该类明确用于 Kafka 事务,而不是外部数据库操作

标签: spring-boot apache-kafka kafka-consumer-api spring-kafka spring-transactions


【解决方案1】:

移动

dbservice.validateMessage(message);

dbservice.saveInDB(message);

使用@Transactional 注释的新方法。

然后

try {
    dbMethod(message);
    ack.ack();
catch (Exception e) {
    ack.nack(); // with an optional delay before redelivery
}

或者,只需使用容器管理的偏移量(无 ack/nack)并让异常传播到容器,SeekToCurrentErrorHandler 可以管理重试。

【讨论】:

  • Russel 我不知道SeekToCurrentErrorHandler。会看看它。另外可以告诉我是手动提交偏移量还是让kafka处理它更好的主意。因为这些消息是金融交易和丢失它们将转化为 $$ 的丢失,我不能丢失这些消息。
  • 容器将可靠地提交偏移量;在处理完民意调查的所有记录后使用AckMode.BATCH(默认);在处理每条记录后使用AckMode.RECORD。记录丢失不是问题。
  • 有关错误处理程序、发布死信记录等的信息,请参见此处docs.spring.io/spring-kafka/docs/current/reference/html/…
  • 只有最后一个问题。如果我按照我的问题所示实现消费者代码,消费者 API 可以使用消息两次吗?简而言之,它只是一次处理吗?
  • 1. 如果您的侦听器处理轮询返回的记录的时间过长,代理将强制重新平衡并且偏移量提交将失败。 2. 如果服务器在 DB 提交和偏移提交之间崩溃(例如电源故障)。为避免 #1,请确保 max.poll.interval.ms 的长度足以处理 max.poll.records。 #2 是一个小窗口,但概率非零。如果您将主题/分区/偏移量存储在数据库中,则可以使用它来检测重复项。
【解决方案2】:

在 Kafka 侦听器级别添加 @Transactional

@KafkaListener(topics = "${general.topic.name}" , groupId = "${general.topic.group.id}" )
@Transactional
public void consume(String message,@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment ack) {
....
}

参考:https://docs.spring.io/spring-kafka/reference/html/#ex-jdbc-sync

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-02
    • 1970-01-01
    • 1970-01-01
    • 2020-11-09
    • 1970-01-01
    相关资源
    最近更新 更多