【问题标题】:Spring Kafka ChainedKafkaTransactionManager doesn't synchronize with JPA Spring-data transactionSpring Kafka ChainedKafkaTransactionManager 不与 JPA Spring-data 事务同步
【发布时间】:2020-03-07 08:17:00
【问题描述】:

我阅读了大量 Gary Russell 的答案和帖子,但没有找到以下序列同步常见用例的实际解决方案:

recieve from topic A => save to DB via Spring-data => send to topic B

据我了解:在这种情况下无法保证完全原子处理,我需要在客户端处理消息重复数据删除,但主要问题是 ChainedKafkaTransactionManager 不与 JpaTransactionManager 同步强>(见下文@KafkaListener

卡夫卡配置:

@Production
@EnableKafka
@Configuration
@EnableTransactionManagement
public class KafkaConfig {

    private static final Logger log = LoggerFactory.getLogger(KafkaConfig.class);

    @Bean
    public ConsumerFactory<String, byte[]> commonConsumerFactory(@Value("${kafka.broker}") String bootstrapServer) {

        Map<String, Object> props = new HashMap<>();
        props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);

        props.put(AUTO_OFFSET_RESET_CONFIG, 'earliest');
        props.put(SESSION_TIMEOUT_MS_CONFIG, 10000);
        props.put(ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(MAX_POLL_RECORDS_CONFIG, 10);
        props.put(MAX_POLL_INTERVAL_MS_CONFIG, 17000);
        props.put(FETCH_MIN_BYTES_CONFIG, 1048576);
        props.put(FETCH_MAX_WAIT_MS_CONFIG, 1000);
        props.put(ISOLATION_LEVEL_CONFIG, 'read_committed');

        props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerContainerFactory(
            @Qualifier("commonConsumerFactory") ConsumerFactory<String, byte[]> consumerFactory,
            @Qualifier("chainedKafkaTM") ChainedKafkaTransactionManager chainedKafkaTM,
            @Qualifier("kafkaTemplate") KafkaTemplate<String, byte[]> kafkaTemplate,
            @Value("${kafka.concurrency:#{T(java.lang.Runtime).getRuntime().availableProcessors()}}") Integer concurrency
    ) {

        ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setMissingTopicsFatal(false);
        factory.getContainerProperties().setTransactionManager(chainedKafkaTM);

        factory.setConsumerFactory(consumerFactory);
        factory.setBatchListener(true);
        var arbp = new DefaultAfterRollbackProcessor<String, byte[]>(new FixedBackOff(1000L, 3));
        arbp.setCommitRecovered(true);
        arbp.setKafkaTemplate(kafkaTemplate);

        factory.setAfterRollbackProcessor(arbp);
        factory.setConcurrency(concurrency);

        factory.afterPropertiesSet();

        return factory;
    }

    @Bean
    public ProducerFactory<String, byte[]> producerFactory(@Value("${kafka.broker}") String bootstrapServer) {

        Map<String, Object> configProps = new HashMap<>();

        configProps.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);

        configProps.put(BATCH_SIZE_CONFIG, 16384);
        configProps.put(ENABLE_IDEMPOTENCE_CONFIG, true);

        configProps.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);

        var kafkaProducerFactory = new DefaultKafkaProducerFactory<String, byte[]>(configProps);
        kafkaProducerFactory.setTransactionIdPrefix('kafka-tx-');

        return kafkaProducerFactory;
    }

    @Bean
    public KafkaTemplate<String, byte[]> kafkaTemplate(@Qualifier("producerFactory") ProducerFactory<String, byte[]> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }

    @Bean
    public KafkaTransactionManager kafkaTransactionManager(@Qualifier("producerFactory") ProducerFactory<String, byte[]> producerFactory) {
        KafkaTransactionManager ktm = new KafkaTransactionManager<>(producerFactory);
        ktm.setTransactionSynchronization(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
        return ktm;
    }

    @Bean
    public ChainedKafkaTransactionManager chainedKafkaTM(JpaTransactionManager jpaTransactionManager,
                                                         KafkaTransactionManager kafkaTransactionManager) {
        return new ChainedKafkaTransactionManager(kafkaTransactionManager, jpaTransactionManager);
    }

    @Bean(name = "transactionManager")
    public JpaTransactionManager transactionManager(EntityManagerFactory em) {
        return new JpaTransactionManager(em);
    }
}

卡夫卡监听器:

@KafkaListener(groupId = "${group.id}", idIsGroup = false, topics = "${topic.name.import}")
public void consume(List<byte[]> records, @Header(KafkaHeaders.OFFSET) Long offset) {
    for (byte[] record : records) {
        // cause infinity rollback (perhaps due to batch listener)
        if (true)
            throw new RuntimeExcetion("foo");

        // spring-data storage with @Transactional("chainedKafkaTM"), since Spring-data can't determine TM among transactionManager, chainedKafkaTM, kafkaTransactionManager
        var result = storageService.persist(record);

        kafkaTemplate.send(result);
    }
}

Spring-kafka 版本:2.3.3 春季启动版本:2.2.1

实现这种用例的正确方法是什么? Spring-kafka 文档仅限于小型/特定示例。

P.s.当我在 @KafkaListener 方法上使用 @Transactional(transactionManager = "chainedKafkaTM", rollbackFor = Exception.class) 时,我面临无限循环回滚,但是设置了 FixedBackOff(1000L, 3L)

编辑:我计划通过可配置的重试次数在侦听器、生产者和数据库之间实现最大负担得起的同步。

编辑: 上面的代码 sn-ps 已根据建议的配置进行了编辑。使用 ARBP 并不能解决我的无限回滚循环,因为第一条语句的谓词始终为 false (SeekUtils.doSeeks):

DefaultAfterRollbackProcessor
...
@Override
    public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer, Exception exception,
            boolean recoverable) {

        if (SeekUtils.doSeeks(((List) records), consumer, exception, recoverable,
                getSkipPredicate((List) records, exception), LOGGER)
                    && isCommitRecovered() && this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) {
            ConsumerRecord<K, V> skipped = records.get(0);
            this.kafkaTemplate.sendOffsetsToTransaction(
                    Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
                            new OffsetAndMetadata(skipped.offset() + 1)));
        }
    }

值得一提的是,Kafka Consumer方法(TransactionSynchronizationManager.isActualTransactionActive())中没有活跃的事务。

【问题讨论】:

    标签: java spring-boot apache-kafka spring-data spring-kafka


    【解决方案1】:

    是什么让您认为它不同步?你真的不需要@Transactional,因为容器会启动两个事务。

    您不应该在事务中使用SeekToCurrentErrorHandler,因为这发生在事务中。改为配置后回滚处理器。默认 ARBP 使用 FixedBackOff(0L, 9)(10 次尝试)。

    这对我来说很好;并在 4 次交付尝试后停止:

    @SpringBootApplication
    public class So58804826Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So58804826Application.class, args);
        }
    
        @Bean
        public JpaTransactionManager transactionManager() {
            return new JpaTransactionManager();
        }
    
    
        @Bean
        public ChainedKafkaTransactionManager<?, ?> chainedTxM(JpaTransactionManager jpa,
                KafkaTransactionManager<?, ?> kafka) {
    
            kafka.setTransactionSynchronization(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
            return new ChainedKafkaTransactionManager<>(kafka, jpa);
        }
    
        @Autowired
        private Saver saver;
    
        @KafkaListener(id = "so58804826", topics = "so58804826")
        public void listen(String in) {
            System.out.println("Storing: " + in);
            this.saver.save(in);
        }
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("so58804826")
                    .partitions(1)
                    .replicas(1)
                    .build();
        }
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<String, String> template) {
            return args -> {
    //          template.executeInTransaction(t -> t.send("so58804826", "foo"));
            };
        }
    
    }
    
    @Component
    class ContainerFactoryConfigurer {
    
        ContainerFactoryConfigurer(ConcurrentKafkaListenerContainerFactory<?, ?> factory,
                ChainedKafkaTransactionManager<?, ?> tm) {
    
            factory.getContainerProperties().setTransactionManager(tm);
            factory.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(new FixedBackOff(1000L, 3)));
        }
    
    }
    
    @Component
    class Saver {
    
        @Autowired
        private MyEntityRepo repo;
    
        private final AtomicInteger ids = new AtomicInteger();
    
        @Transactional("chainedTxM")
        public void save(String in) {
            this.repo.save(new MyEntity(in, this.ids.incrementAndGet()));
            throw new RuntimeException("foo");
        }
    
    }
    

    我从两个 TxM 中看到“参与现有交易”。

    使用@Transactional("transactionManager"),我只是从 JPATm 中得到它,正如人们所期望的那样。

    编辑

    批处理侦听器没有“恢复”的概念——框架不知道批处理中的哪条记录需要被跳过。在 2.3 中,我们为使用 MANUAL ack 模式时的批处理侦听器添加了一项新功能。

    Committing Offsets

    从 2.3 版本开始,Acknowledgment 接口有两个额外的方法 nack(long sleep) 和 nack(int index, long sleep)。第一个用于记录侦听器,第二个用于批处理侦听器。为您的侦听器类型调用错误的方法将引发 IllegalStateException。

    使用批处理侦听器时,您可以指定批处理中发生故障的索引。当调用nack() 时,将为索引之前的记录提交偏移量,并在分区上为失败和丢弃的记录执行查找,以便在下一次 poll() 时重新传递它们。这是对 SeekToCurrentBatchErrorHandler 的改进,SeekToCurrentBatchErrorHandler 只能寻找整个批次进行重新投递。

    但是,失败的记录仍然会无限期地重播。

    您可以跟踪不断失败的记录,然后点击index + 1 跳过它。

    但是,由于您的 JPA tx 已回滚;这对你不起作用。

    使用批处理侦听器,您必须在侦听器代码中处理批处理问题。

    【讨论】:

    • 哇,非常感谢!!!我今晚去看看!但在我的示例中,记录在抛出 RuntimeException 之前就已保留。你在“你真的不需要@Transactional”下是什么意思——我不需要在存储库或消费者上使用@Transactional?哦,对不起,现在我明白了:我不需要消费者/侦听器上的事务,但我需要使用 ChainedTM 配置 Spring-data,而不仅仅是普通的 JpaTransactionManager,对吗?
    • 否;我说@Transactional("chainedTxM") 是多余的,因为侦听器容器在调用侦听器之前启动事务。它没有伤害(因为我们得到Participating in existing transaction),但它是不必要的。
    • 我检查了设置,我遇到了两个问题: 1. Spring-data 无法在 3 TM (kafka, chain, jpa) 之间做出决定,所以它不会启动,所以我需要设置@Transactional("chainedTM") 在我的存储库类上,2. 如果在 listen 方法的开头(在任何模板或存储库使用之前)抛出运行时异常,它会无限循环并且 ARBP 不起作用。
    • 在无限循环的情况下,无论ARBP如何,它总是寻求相同的偏移量:INFO Oakclients.consumer.KafkaConsumer - ....寻求偏移量 1...请看第二个编辑回答。
    • 这没有意义;我的例子不管有没有@Transactional都可以正常工作。在某处发布调试日志以显示您所看到的行为。将您的代码与我的代码进行比较。
    猜你喜欢
    • 2021-12-13
    • 2018-05-01
    • 2018-08-07
    • 1970-01-01
    • 2019-10-03
    • 2017-07-17
    • 2017-05-09
    • 2014-11-05
    • 1970-01-01
    相关资源
    最近更新 更多