【问题标题】:Kafka Transaction Manager sends to Kafka Broker despite transaction rolling back尽管事务回滚,Kafka 事务管理器仍会发送到 Kafka 代理
【发布时间】:2021-05-24 03:06:04
【问题描述】:

尽管交易失败,我的 Kafka Producer 仍继续向 Kafka Broker 发送数据。我有一个自定义侦听器,即我没有使用 @KafkaListener 注释。这是在 Spring-kafka 2.2.x 上运行的

任何想法为什么尽管 KafkaTransactionManager 回滚,消息最终还是在 Kafka 中?下面是我的设置:

// Kafka producer sender
@Transactional(transactionManager = "kafkaTransactionManager", propagation = Propagation.REQUIRED)
public void sendToKafkaWithTransaction(final String topic, final Object payload){
    ProducerRecord<String, Object> record = new ProducerRecord(topic, key, payload);
    template.executeInTransaction(kt -> kt.send(record));
}

// RabbitMQ producer sender
@Transactional(transactionManager = "rabbitTransactionManager", propagation = Propagation.REQUIRED)
public void sendToRabbitmqWithTransaction(final String topic, final String header, final Object payload){
    template.convertAndSend(topic, header, payload);
}

// Chained Transaction Manager
@Bean(name = "chainedKafkaTransactionManager")
public ChainedKafkaTransactionManager<Object, Object> chainedKafkaTransactionManager(
       @Qualifier(value = "transactionalKafkaProducer") ProducerFactory<String, Object> producerFactory,
       @Qualifier(value = "transactionManager") JpaTransactionManager jpaTransactionManager,
       @Qualifier(value = "rabbitTransactionManager") RabbitTransactionManager rabbitTransactionManager) {
   KafkaTransactionManager producerKtm = new KafkaTransactionManager(producerFactory);
producerKtm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
   return new ChainedKafkaTransactionManager<>(jpaTransactionManager, producerKtm, rabbitTransactionManager);
}


// Listener config
listenerFactory.getContainerProperties().setTransactionManager(chainedKafkaTransactionManager);



// Listener
@Transactional(transactionManager = "chainedKafkaTransactionManager")
public void onMessage(final ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer){
    
    try {
            RetryState retryState = new DefaultRetryState(consumerRecord.topic() + "-" + consumerRecord.partition() + "-" + consumerRecord.offset());

            retryTemplate.execute(context -> {
                saveToDb() // This rolls back
                sendToKafkaWithTransaction(topic, payload); // This message gets to Kafa, it should not.
                sendToRabbitmqWithTransaction(topic, payload);  // This rolls back
                throw new Exception("Out of Anger");
                return null;
            }, recoveryCallBack, retryState);

            acknowledgment.acknowledge();
      }
      catch (ListenerExecutionFailedException e) {
         throw e;
      }
}    

// See logs
[ consumer-0-C-1] o.s.a.r.t.RabbitTransactionManager       : Participating transaction failed - marking existing transaction as rollback-only
[ consumer-0-C-1] o.s.k.t.KafkaTransactionManager          : Participating transaction failed - marking existing transaction as rollback-only
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager        : Participating transaction failed - marking existing transaction as rollback-only
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager        : Setting JPA transaction on EntityManager [SessionImpl(104745239<open>)] rollback-only

编辑: 添加 Spring Boot 配置:

spring.kafka:
  admin:
    bootstrap-servers: ${kakfa.host}
  consumer:
    group-id: test-consumers
    client-id: test-consumers
    auto-offset-reset: latest
    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    enable-auto-commit: false
    properties:
        isolation-level: read_committed
  producer:
    client-id: test-producer
    acks: all
    retries: 3
    transaction-id-prefix: test-producer-tx-
    value-serializer: org.apache.kafka.common.serialization.StringSerializer
    properties:
      enable.idempotence: true
      transactional.id: tran-id-1-
      max.in.flight.requests.per.connection: 5
      isolation-level: read_committed

编辑 更多日志

[ consumer-0-C-1] o.s.t.i.TransactionInterceptor           : Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.saveAndFlush]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.data.jpa.repository.support.CrudMethodMetadataPostProcessor$DefaultCrudMethodMetadata@18061927] for key [public abstract java.lang.Object org.springframework.data.jpa.repository.JpaRepository.saveAndFlush(java.lang.Object)] from thread [consumer-0-C-1]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.kafka.core.KafkaResourceHolder@23b172f4] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@4df8f4e9] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.kafka.core.KafkaResourceHolder@23b172f4] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@4df8f4e9] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.kafka.core.KafkaResourceHolder@23b172f4] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@4df8f4e9] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.kafka.core.KafkaTemplate             : Sending: ProducerRecord(topic=topic-1, partition=null)
[-27cf188e6c23-1] org.apache.kafka.clients.Metadata        : Cluster ID: r3baK471R6mIft7L_DIOIg
[ consumer-0-C-1] o.s.kafka.core.KafkaTemplate             : Sent: ProducerRecord(topic=topic-1, partition=null)
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.orm.jpa.EntityManagerHolder@16bfeffa] for key [org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean@30eed725] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager        : Found thread-bound EntityManager [SessionImpl(23309560<open>)] for JPA transaction
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.jdbc.datasource.ConnectionHolder@cbfb10d] for key [HikariDataSource (HikariPool-1)] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager        : Participating in existing transaction
[ consumer-0-C-1] o.s.t.i.TransactionInterceptor           : Getting transaction for [com.arca.framework.messaging.services.impl.BoradcastMessageServiceImpl.sendTransactional]
[-27cf188e6c23-1] o.s.kafka.core.KafkaTemplate             : Sent ok: ProducerRecord(topic=topic-1, partition=null), metadata: topic-1-0@185
[ consumer-0-C-1] o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://guest@127.0.0.1:5672/,4)
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Bound value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@2655e199] for key [CachingConnectionFactory [channelCacheSize=25, host=localhost, port=5672, active=true rabbitConnectionFactory]] to thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.amqp.rabbit.core.RabbitTemplate      : Executing callback RabbitTemplate$$Lambda$1237/634386320 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,4), conn: Proxy@3c964873 Shared Rabbit Connection: SimpleConnection@5d9ccad2 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 64338]
[ consumer-0-C-1] o.s.amqp.rabbit.core.RabbitTemplate      : Publishing message (Body:'{ }')
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@2655e199] for key [CachingConnectionFactory [channelCacheSize=25, host=localhost, port=5672, active=true rabbitConnectionFactory]] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.t.i.TransactionInterceptor           : Completing transaction for [messaging.services.impl.RabbitMessageServiceImpl.send]
[ consumer-0-C-1] c.a.f.m.k.r.KafkaSingleDispatchReceiver  : Unable to process messages of type: [class messaging.kafka.events.acquiringtmstransaction.TmsTransactionEvent] and id: [92dccb48-2cd2-47b8-b778-8550dcd72d04]
[ consumer-0-C-1] .a.f.m.k.c.KafkaTransactionalRetryPolicy : Retry count [1] for message [{}]
[ consumer-0-C-1] o.s.t.i.TransactionInterceptor           : Completing transaction for [messaging.kafka.receivers.KafkaReceiver.onMessage] after exception: exceptions.MyException: Out of anger
[ consumer-0-C-1] o.s.t.i.RuleBasedTransactionAttribute    : Applying rules to determine whether transaction should rollback on exceptions.MyException: Out of anger
[ consumer-0-C-1] o.s.t.i.RuleBasedTransactionAttribute    : Winning rollback rule is: null
[ consumer-0-C-1] o.s.t.i.RuleBasedTransactionAttribute    : No relevant rollback rule found: applying default rules
[ consumer-0-C-1] o.s.k.t.KafkaTransactionManager          : Triggering beforeCompletion synchronization
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@2655e199] for key [CachingConnectionFactory [channelCacheSize=25, host=localhost, port=5672, active=true rabbitConnectionFactory]] from thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.k.t.KafkaTransactionManager          : Initiating transaction rollback
[ consumer-0-C-1] o.s.k.core.DefaultKafkaProducerFactory   : abortTransaction: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@b70782d, txId=tran-id-1-acquiring-tms-transaction-consumers.pos_txn_log.0]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Clearing transaction synchronization
[ consumer-0-C-1] o.s.k.t.KafkaTransactionManager          : Triggering afterCompletion synchronization
[ consumer-0-C-1] o.s.a.r.connection.RabbitResourceHolder  : Rolling back messages to channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,4), conn: Proxy@3c964873 Shared Rabbit Connection: SimpleConnection@5d9ccad2 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 64338]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.kafka.core.KafkaResourceHolder@23b172f4] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@4df8f4e9] from thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.k.t.KafkaTransactionManager          : Resuming suspended transaction after completion of inner transaction
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Initializing transaction synchronization
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager        : Participating transaction failed - marking existing transaction as rollback-only
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager        : Setting JPA transaction on EntityManager [SessionImpl(23309560<open>)] rollback-only
[ consumer-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception

【问题讨论】:

    标签: apache-kafka spring-kafka spring-transactions


    【解决方案1】:

    这就是 Kafka 事务的工作方式。发布的记录总是写入日志,后跟一个标记记录,指示事务是提交还是回滚。

    为避免看到回滚记录,您必须将使用者 isolation.level 属性设置为 read_committed(默认为 read_uncommitted)。

    编辑

    这是因为你正在开始一个新的交易:

    template.executeInTransaction(kt -> kt.send(record));
    
    /**
     * Execute some arbitrary operation(s) on the operations and return the result.
     * The operations are invoked within a local transaction and do not participate
     * in a global transaction (if present).
     * @param callback the callback.
     * @param <T> the result type.
     * @return the result.
     * @since 1.1
     */
    @Nullable
    <T> T executeInTransaction(OperationsCallback<K, V, T> callback);
    

    只需调用template.send(),模板就会参与到容器启动的事务中。

    您也可以从该方法中删除@Transactional

    EDIT2

    这对我来说按预期工作......

    spring.kafka.producer.transaction-id-prefix=tx-
    
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.isolation-level=read-committed
    
    logging.level.org.springframework.transaction=trace
    logging.level.org.springframework.kafka.core=trace
    
    @SpringBootApplication
    @EnableTransactionManagement
    public class So66306109Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So66306109Application.class, args);
        }
    
        @Autowired
        Foo foo;
    
        @Transactional
        @KafkaListener(id = "so66306109", topics = "so66306109") // Not really needed; the container has already started it
        public void listen(String in) {
            System.out.println(in);
            this.foo.send(in.toUpperCase());
            throw new RuntimeException("test");
        }
    
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("so66306109").partitions(1).replicas(1).build();
        }
    
        @Bean
        public NewTopic topic2() {
            return TopicBuilder.name("so66306109-2").partitions(1).replicas(1).build();
        }
    
    
        @KafkaListener(id = "so66306109-2", topics = "so66306109-2")
        public void listen2(String in) {
            System.out.println(in);
        }
    
    }
    
    @Component
    class Foo {
    
        @Autowired
        KafkaTemplate<String, String> template;
    
        @Transactional // Not needed - we're already in a transaction
        void send(String in) {
            this.template.send("so66306109-2", in);
        }
    
    }
    

    EDIT3

    如果无法升级到受支持的版本,则需要禁用容器中的事务,并在代码中自行管理,在重试执行范围内。

    这是一个例子。

    @SpringBootApplication
    @EnableTransactionManagement
    public class So66306109Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So66306109Application.class, args);
        }
    
        @Autowired
        Foo foo;
    
        @Autowired
        RetryTemplate template;
    
        @KafkaListener(id = "so66306109", topics = "so66306109") // Not really needed; the container has already started it
        public void listen(ConsumerRecord<String, String> in) {
            this.template.execute(context -> {
                System.out.println(in);
                this.foo.send(in);
                return null;
            }, context -> {
                System.out.println("RETRIES EXHAUSTED");
                return null;
            });
        }
    
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("so66306109").partitions(1).replicas(1).build();
        }
    
        @Bean
        public NewTopic topic2() {
            return TopicBuilder.name("so66306109-2").partitions(1).replicas(1).build();
        }
    
        @KafkaListener(id = "so66306109-2", topics = "so66306109-2")
        public void listen2(String in) {
            System.out.println(in);
        }
    
        @Bean
        ChainedKafkaTransactionManager<String, String> chainedTm(KafkaTransactionManager<String, String> ktm,
                ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
    
            // transactions can't be started by the container
            factory.getContainerProperties().setTransactionManager(null);
            return new ChainedKafkaTransactionManager<>(ktm);
        }
    
        @Bean
        public RetryTemplate template() {
            return new RetryTemplate();
        }
    
    }
    
    @Component
    class Foo {
    
        @Autowired
        KafkaTemplate<String, String> template;
    
        @Autowired
        ProducerFactory<String, String> pf;
    
        @Transactional("chainedTm")
        public void send(ConsumerRecord<String,String> in) {
            // updateDB
            this.template.send(new ProducerRecord<String, String>("so66306109-2", null, null, in.value().toUpperCase()));
            this.template.sendOffsetsToTransaction(Collections.singletonMap(new TopicPartition(in.topic(), in.partition()),
                    new OffsetAndMetadata(in.offset() + 1)));
    
            // simulate a DB rollback
            KafkaResourceHolder<String, String> resource = (KafkaResourceHolder<String, String>) TransactionSynchronizationManager
                    .getResource(this.pf);
            resource.setRollbackOnly();
        }
    
    }
    

    注意;您不得手动确认此类记录;相反,在提交之前将偏移量发送到事务。

    【讨论】:

    • 我已经将生产者和消费者隔离级别设置为 read_committed。请在我上次编辑时查看我的配置。
    • 这是因为您正在开始一项新交易 - 请参阅我的答案的编辑。
    • 我已更改为使用template.send() 行为保持不变。
    • 我刚刚再次对其进行了测试,它按预期工作。我会将我的测试添加到我的答案中。通过引导属性设置时,我的 IDE 更喜欢 read-committed 而不是 read_committed
    • 无论如何,您都不应该在 Kafka 事务中进行重试。您应该将重试要求配置到容器的后回滚处理器中。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2011-12-10
    • 1970-01-01
    • 2018-12-30
    • 2015-11-18
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多