【问题标题】:Spring Cloud Stream Multi Topic Transaction ManagementSpring Cloud Stream 多主题事务管理
【发布时间】:2019-12-14 05:10:40
【问题描述】:

我正在尝试用 Java 创建一个 PoC 应用程序,以了解在使用 Kafka 进行消息发布时如何在 Spring Cloud Stream 中进行事务管理。我试图模拟的用例是接收消息的处理器。然后它进行一些处理并生成两条新消息,发往两个不同的主题。我希望能够将两条消息作为单个事务发布。因此,如果发布第二条消息失败,我想滚动(而不是提交)第一条消息。 Spring Cloud Stream 是否支持这样的用例?

我设置了@Transactional 注释,我可以看到一个全局事务在消息传递给消费者之前开始。但是,当我尝试通过MessageChannel.send() 方法发布消息时,我可以看到在KafkaProducerMessageHandler 类的handleRequestMessage() 方法中启动并完成了一个新的本地事务。这意味着消息的发送不参与全局事务。因此,如果在发布第一条消息后抛出异常,则消息不会回滚。全局事务被回滚,但由于第一条消息已经提交,这并没有真正做任何事情。

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          transaction:
            transaction-id-prefix: txn.
            producer: # these apply to all producers that participate in the transaction
              partition-key-extractor-name: partitionKeyExtractorStrategy
              partition-selector-name: partitionSelectorStrategy
              partition-count: 3
              configuration:
               acks: all
               enable:
                 idempotence: true
               retries: 10
        bindings:
          input-customer-data-change-topic:
            consumer:
              configuration:
                isolation:
                  level: read_committed
              enable-dlq: true
      bindings:
        input-customer-data-change-topic:
          content-type: application/json
          destination: com.fis.customer
          group: com.fis.ec
          consumer:
            partitioned: true
            max-attempts: 1
        output-name-change-topic:
          content-type: application/json
          destination: com.fis.customer.name          
        output-email-change-topic:
          content-type: application/json
          destination: com.fis.customer.email
@SpringBootApplication
@EnableBinding(CustomerDataChangeStreams.class)
public class KafkaCloudStreamCustomerDemoApplication
{
   public static void main(final String[] args)
   {
      SpringApplication.run(KafkaCloudStreamCustomerDemoApplication.class, args);
   }
}
public interface CustomerDataChangeStreams
{
   @Input("input-customer-data-change-topic")
   SubscribableChannel inputCustomerDataChange();

   @Output("output-email-change-topic")
   MessageChannel outputEmailDataChange();

   @Output("output-name-change-topic")
   MessageChannel outputNameDataChange();
}
@Component
public class CustomerDataChangeListener
{
   @Autowired
   private CustomerDataChangeProcessor mService;

   @StreamListener("input-customer-data-change-topic")
   public Message<String> handleCustomerDataChangeMessages(
      @Payload final ImmutableCustomerDetails customerDetails)
   {
      return mService.processMessage(customerDetails);
   }
}
@Component
public class CustomerDataChangeProcessor
{
   private final CustomerDataChangeStreams mStreams;

   @Value("${spring.cloud.stream.bindings.output-email-change-topic.destination}")
   private String mEmailChangeTopic;

   @Value("${spring.cloud.stream.bindings.output-name-change-topic.destination}")
   private String mNameChangeTopic;

   public CustomerDataChangeProcessor(final CustomerDataChangeStreams streams)
   {
      mStreams = streams;
   }

   public void processMessage(final CustomerDetails customerDetails)
   {
      try
      {
         sendNameMessage(customerDetails);
         sendEmailMessage(customerDetails);
      }
      catch (final JSONException ex)
      {
         LOGGER.error("Failed to send messages.", ex);
      }
   }

   public void sendNameMessage(final CustomerDetails customerDetails)
      throws JSONException
   {
      final JSONObject nameChangeDetails = new JSONObject();
      nameChangeDetails.put(KafkaConst.BANK_ID_KEY, customerDetails.bankId());
      nameChangeDetails.put(KafkaConst.CUSTOMER_ID_KEY, customerDetails.customerId());
      nameChangeDetails.put(KafkaConst.FIRST_NAME_KEY, customerDetails.firstName());
      nameChangeDetails.put(KafkaConst.LAST_NAME_KEY, customerDetails.lastName());
      final String action = customerDetails.action();
      nameChangeDetails.put(KafkaConst.ACTION_KEY, action);

      final MessageChannel nameChangeMessageChannel = mStreams.outputNameDataChange();
      emailChangeMessageChannel.send(MessageBuilder.withPayload(nameChangeDetails.toString())
         .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
         .setHeader(KafkaHeaders.TOPIC, mNameChangeTopic).build());

      if ("fail_name_illegal".equalsIgnoreCase(action))
      {
         throw new IllegalArgumentException("Customer name failure!");
      }
   }

   public void sendEmailMessage(final CustomerDetails customerDetails) throws JSONException
   {
      final JSONObject emailChangeDetails = new JSONObject();
      emailChangeDetails.put(KafkaConst.BANK_ID_KEY, customerDetails.bankId());
      emailChangeDetails.put(KafkaConst.CUSTOMER_ID_KEY, customerDetails.customerId());
      emailChangeDetails.put(KafkaConst.EMAIL_ADDRESS_KEY, customerDetails.email());
      final String action = customerDetails.action();
      emailChangeDetails.put(KafkaConst.ACTION_KEY, action);

      final MessageChannel emailChangeMessageChannel = mStreams.outputEmailDataChange();
      emailChangeMessageChannel.send(MessageBuilder.withPayload(emailChangeDetails.toString())
         .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
         .setHeader(KafkaHeaders.TOPIC, mEmailChangeTopic).build());

      if ("fail_email_illegal".equalsIgnoreCase(action))
      {
         throw new IllegalArgumentException("E-mail address failure!");
      }
   }
}

编辑

我们越来越近了。不再创建本地事务。但是,即使出现异常,全局事务仍然会被提交。据我所知,异常不会传播到 TransactionTemplate.execute() 方法。因此,事务被提交。似乎sendMessage() 方法中的MessageProducerSupport 类“吞噬”了catch 子句中的异常。如果定义了错误通道,则会向其发布消息,因此不会重新引发异常。我尝试关闭错误通道(spring.cloud.stream.kafka.binder.transaction.producer.error-channel-enabled = false),但这并没有关闭它。因此,只是为了测试,我只是在调试器中将错误通道设置为 null 以强制重新抛出异常。这似乎做到了。但是,即使我将 max-attempts 设置为该消费者的 1,原始消息仍会继续重新传递给初始消费者。

【问题讨论】:

    标签: java apache-kafka spring-kafka spring-cloud-stream


    【解决方案1】:

    the documentation

    spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix

    在活页夹中启用事务。请参阅 Kafka 文档中的 transaction.id 和 spring-kafka 文档中的 Transactions。启用事务后,将忽略单个生产者属性,所有生产者都使用 spring.cloud.stream.kafka.binder.transaction.producer.* 属性。

    默认 null(无交易)

    spring.cloud.stream.kafka.binder.transaction.producer.*

    事务绑定器中生产者的全局生产者属性。请参阅 spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix 和 Kafka Producer Properties 以及所有 binder 支持的一般 producer properties。

    默认值:查看各个生产者属性。

    您必须配置共享全局生产者。

    不要添加@Transactional - 容器会在提交事务之前启动事务并将偏移量发送到事务。

    如果侦听器抛出异常,事务将回滚,DefaultAfterRollbackPostProcessor 将重新寻找主题/分区,以便重新传递记录。

    编辑

    binder 的事务管理器的配置中存在一个错误,导致输出绑定启动新的本地事务。

    要解决此问题,请使用以下容器定制器 bean 重新配置 TM...

    @Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer() {
        return (container, dest, group) -> {
            KafkaTransactionManager<?, ?> tm = (KafkaTransactionManager<?, ?>) container.getContainerProperties()
                    .getTransactionManager();
            tm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
        };
    }
    

    EDIT2

    您不能使用活页夹的 DLQ 支持,因为从容器的角度来看,交付是成功的。我们需要将异常传播到容器以强制回滚。因此,您需要将死信移至AfterRollbackProcessor。这是我完整的测试课:

    @SpringBootApplication
    @EnableBinding(Processor.class)
    public class So57379575Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So57379575Application.class, args);
        }
    
        @Autowired
        private MessageChannel output;
    
        @StreamListener(Processor.INPUT)
        public void listen(String in) {
            System.out.println("in:" + in);
            this.output.send(new GenericMessage<>(in.toUpperCase()));
            if (in.equals("two")) {
                throw new RuntimeException("fail");
            }
        }
    
        @KafkaListener(id = "so57379575", topics = "so57379575out")
        public void listen2(String in) {
            System.out.println("out:" + in);
        }
    
        @KafkaListener(id = "so57379575DLT", topics = "so57379575dlt")
        public void listen3(String in) {
            System.out.println("dlt:" + in);
        }
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
            return args -> {
                template.send("so57379575in", "one".getBytes());
                template.send("so57379575in", "two".getBytes());
            };
        }
    
        @Bean
        public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(
                KafkaTemplate<Object, Object> template) {
    
            return (container, dest, group) -> {
                // enable transaction synchronization
                KafkaTransactionManager<?, ?> tm = (KafkaTransactionManager<?, ?>) container.getContainerProperties()
                        .getTransactionManager();
                tm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
                // container dead-lettering
                DefaultAfterRollbackProcessor<? super byte[], ? super byte[]> afterRollbackProcessor =
                        new DefaultAfterRollbackProcessor<>(new DeadLetterPublishingRecoverer(template,
                                (ex, tp) -> new TopicPartition("so57379575dlt", -1)), 0);
                container.setAfterRollbackProcessor(afterRollbackProcessor);
            };
        }
    
    }
    

    spring:
      kafka:
        bootstrap-servers:
        - 10.0.0.8:9092
        - 10.0.0.8:9093
        - 10.0.0.8:9094
        consumer:
          auto-offset-reset: earliest
          enable-auto-commit: false
          properties:
            isolation.level: read_committed
      cloud:
        stream:
          bindings:
            input:
              destination: so57379575in
              group: so57379575in
              consumer:
                max-attempts: 1
            output:
              destination: so57379575out
          kafka:
            binder:
              transaction:
                transaction-id-prefix: so57379575tx.
                producer:
                  configuration:
                    acks: all
                    retries: 10
    
    #logging:
    #  level:
    #    org.springframework.kafka: trace
    #    org.springframework.transaction: trace
    

    in:two
    2019-08-07 12:43:33.457 ERROR 36532 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Exception thrown while 
    ...
    Caused by: java.lang.RuntimeException: fail
    ...
    in:one
    dlt:two
    out:ONE
    

    【讨论】:

    • 感谢加里的回复。我删除了 @Transactional 注释并更新了生产者的 application.yaml 文件。第一条消息在发送后仍然会被提交。所以MessageChannel.send()调用后抛出的异常不会回滚。
    • 这些话题的消费者有isolation=read_committed吗?如果是,请在 GitHub gist 或 pastebin 等地方发布调试日志。不过,我可能要到明天才能查看它。
    • 是的,消费者就是这样配置的。我将 debig 日志内容上传到 Gist:gist.github.com/botPotyo/125957dba37a8da642509c2cddfd2297 感谢您查看此内容!
    • 那个日志好像只有tomcat的东西。但我重现了这个问题;这是一个错误 - 我通过变通方法编辑了我的答案。
    • 请在原帖中查看我的编辑。我们正在取得进展,但仍然存在问题。
    猜你喜欢
    • 2017-11-20
    • 1970-01-01
    • 2017-03-30
    • 1970-01-01
    • 1970-01-01
    • 2018-12-17
    • 2019-04-17
    • 2022-12-19
    • 2019-09-07
    相关资源
    最近更新 更多