【发布时间】:2021-08-30 19:00:45
【问题描述】:
如何在事务中执行以下操作。我的要求是,如果数据库调用失败,则不应将消息偏移量提交给 Kafka。Kafka 消费者配置在这里https://pastebin.com/kq5S9Jrx
@KafkaListener(topics = "${general.topic.name}" , groupId = "${general.topic.group.id}" )
public void consume(String message,@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment ack)
{
logger.debug(String.format("Message recieved -> %s", message));
// start transaction
dbservice.validateMessage(message);
dbservice.saveInDB(message);
ack.acknowledge();
// end transaction
}
【问题讨论】:
-
一般来说,建议使用 Kafka Connect 来处理数据库写入,而不是在您自己的代码中实现。
-
无论如何,
try { dbservice.saveInDB(message); ack } catch ()似乎不言自明 -
不,我们不使用 Kafka Connect。是的,我曾计划做同样的事情。检查是否有更好的解决方案,比如 chainedkafkatransactionmanager 。
-
AFAIK,该类明确用于 Kafka 事务,而不是外部数据库操作
标签: spring-boot apache-kafka kafka-consumer-api spring-kafka spring-transactions