【问题标题】:How to use @Transactional with @KafkaListener?如何将@Transactional 与@KafkaListener 一起使用?
【发布时间】:2020-07-23 07:52:25
【问题描述】:

是否有可能将声明性 tx 管理(通过 @Transactional)与 @KafkaListener 注释方法一起使用? 例如,我想使用它来为每个侦听器定义单独的 tx 超时。 我的设置如下:

事务管理器:

@Bean
@ConditionalOnBean(value = {HibernateTransactionManager.class})
public ChainedKafkaTransactionManager<Object, Object> chainedHibernateTm(KafkaTransactionManager<String, String> kafkaTransactionManager,
                                                                                 org.springframework.orm.hibernate5.HibernateTransactionManager hibernateTransactionManager) {
  return new ChainedKafkaTransactionManager<>(
    kafkaTransactionManager, 
    hibernateTransactionManager);
}

KafkaListener:

@KafkaListener(topic = "my_topic")
@Transactional(timeout = 5)
public void handleMessage(SomeMessage message){
}

问题是 - KafkaMessageListenerContainer 在调用此类方法之前创建自己的事务 - 它使用自己的 TransactionTemplate:

@Nullable
private TransactionTemplate determineTransactionTemplate() {
  return this.transactionManager != null
    ? new TransactionTemplate(this.transactionManager)
    : null;
}

不使用 TransactionInterceptor。那么如何为具体的@KafkaListener 方法设置特定的 tx 超时呢?

【问题讨论】:

    标签: java spring-boot spring-kafka spring-transactions


    【解决方案1】:

    通过普通的@Transactional 进行事务管理的各个方面无法与您所希望的那样与 Kafka 侦听器一起使用 - 没有内置交互。

    有一个专门的章节如何在事务中使用应用程序事件:Transaction Bound Event 使用注释 @TransactionalEventListener 有助于确保事务已成功完成。

    您可以使用@EventListener 注解注册一个常规事件监听器。如果需要将其绑定到事务,请使用@TransactionalEventListener。这样做时,监听器默认绑定到事务的提交阶段。

    【讨论】:

      【解决方案2】:

      可以做到,但有点复杂,因为你必须将消耗的偏移量发送到 Kafka 事务。

      您可以将KafkaTransactionManager 用于容器,将@Transactional 用于HibernateTransactionManager,而不是使用ChainedKafkaTransactionManager

      这将产生类似的结果,因为休眠 TX 将在 Kafka 事务之前提交(如果休眠提交失败,Kafka TX 将回滚)。

      编辑

      要将不同的链式 TM 配置到每个侦听器容器中,您可以执行以下操作。

      @Component
      class ContainerFactoryCustomizer {
      
          ContainerFactoryCustomizer(AbstractKafkaListenerContainerFactory<?, ?, ?> factory,
                  ChainedKafkaTransactionManager<?, ?> chainedOne,
                  ChainedKafkaTransactionManager<?, ?> chainedTwo) {
              factory.setContainerCustomizer(
                      container -> {
                          String groupId = container.getContainerProperties().getGroupId();
                          if (groupId.equals("foo")) {
                              container.getContainerProperties().setTransactionManager(chainedOne);
                          }
                          else {
                              container.getContainerProperties().setTransactionManager(chainedTwo);
                          }
                      });
          }
      
      }
      

      每个链接的 TM 都有一个具有不同默认超时的 Hibernate TM。

      groupid@KafkaListener idgroupId 属性填充。

      【讨论】:

      • 加里·拉塞尔。谢谢。但是,您能否提供更详细的建议,如何为每个 KafkaListener 实现自定义 tx 超时?我仍然希望与休眠 tx 管理器同步。你说可以做到,但我不知道怎么做。
      • 虽然 Hibernate 有事务超时的概念,但 Kafka 事务中没有超时的概念。最简单的解决方案是让容器只管理 Kafka 事务并为 Hibernate TM 使用 @Transactional;结果将类似于事务同步。要在容器中使用链式 TM,您可以将每个容器配置为使用不同的链式 TM,其中 Hibernate TM 配置有不同的默认超时。请参阅我的答案的编辑。
      • 感谢您的详细解释。如果我选择第一个选项(没有链式 tm),我是否必须手动将消耗的偏移量发送到 kafka 事务?这个选项看起来很有趣,因为我可以使用@Transactional 来配置 hib。 Tm值。目前(使用链式 tm)此注释完全被忽略。我也考虑过定制 KafkaListenerContainerManager,但我必须完全重写它,因为用于启动事务的 TransactionTemplate 是最终的,不能修改:)
      • &gt;If I choose the first option (without chained tm) would I have to manually send consumed offsets to kafka transaction? 。不;容器不知道 DB tx 并将发送偏移量(如果 DB TX 提交)。如果 DB 提交失败,Kafka TX 将回滚(回滚后的处理器将重新寻找分区)。注释不是“完全忽略”,默认传播是REQUIRED,拦截器看到有一个现有事务,所以不需要启动另一个。仅当容器不是事务性的时才需要自己发送偏移量。
      • 正确;但对于仅限生产者的事务,仅使用事务同步会更简单。
      猜你喜欢
      • 2012-05-10
      • 1970-01-01
      • 1970-01-01
      • 2021-05-15
      • 2012-04-01
      • 2018-09-26
      • 2013-09-01
      • 1970-01-01
      相关资源
      最近更新 更多