【问题标题】:Transaction Synchronization in spring boot not working properlySpring Boot中的事务同步无法正常工作
【发布时间】:2019-10-05 17:37:46
【问题描述】:

事务同步和回滚无法正常工作。并且偶尔会给producerFencedException。我的配置或代码中是否有任何错误..?

  1. 我有多个 spring boot 实例
  2. 1 个码头经纪人
  3. spring boot 版本:2.1.4 发布

kafka 发件人配置

@Configuration
@EnableKafka
public class KafkaSenderConfig{

    @Value("${kafka.servers}")
    private String kafkaServers;

    @Value("${application.name}")
    private String applicationName; 

    @Bean(value = "stringKafkaTransactionManager")
    public KafkaTransactionManager<String, String> kafkaStringTransactionManager() {
        KafkaTransactionManager<String, String> ktm = new KafkaTransactionManager<String, String>(stringProducerFactory());
        ktm.setNestedTransactionAllowed(true);
        ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS);
        return ktm;
    }
    @Bean(value = "stringProducerFactory")
    @Primary
    public ProducerFactory<String, String> stringProducerFactory() {
        Map<String, Object> config = new ConcurrentHashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);   
        config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        config.put(ProducerConfig.LINGER_MS_CONFIG, 100);
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(config);
        defaultKafkaProducerFactory.setTransactionIdPrefix("sample-trans-");
         return defaultKafkaProducerFactory;
    }
    @Bean(value = "stringKafkaTemplate")
    @Primary
    public KafkaTemplate<String, String> stringKafkaTemplate() {
        return new KafkaTemplate<>(stringProducerFactory(),true);
    }
    @Bean(name = "chainedStringKafkaTransactionManager")
    @Primary
    public ChainedKafkaTransactionManager<String, String> chainedTransactionManager(JpaTransactionManager jpaTransactionManager, DataSourceTransactionManager dsTransactionManager) {
        return new ChainedKafkaTransactionManager<>(kafkaStringTransactionManager(), jpaTransactionManager, dsTransactionManager);
    }    
}

kafka 接收器配置

@Configuration
@EnableKafka
public class KafkaReceiverConfig {

    @Value("${kafka.servers}")
    private String kafkaServers;

    @Value("${kafka.groupId}")
    private String groupId;

    @Value("${kafka.retry.maxAttempts}")
    private Integer retryMaxAttempts;

    @Value("${kafka.retry.interval}")
    private Long retryInterval;

    @Value("${kafka.concurrency}")
    private Integer concurrency;

    @Value("${kafka.poll.timeout}")
    private Integer pollTimeout;

    @Value("${kafka.consumer.auto-offset-reset:earliest}")
    private String offset = "earliest";

    @Autowired
    private PlatformTransactionManager transactionManager;

    @Bean
    public RetryPolicy retryPolicy() {
        SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
        simpleRetryPolicy.setMaxAttempts(retryMaxAttempts);
        return simpleRetryPolicy;
    }
    @Bean
    public BackOffPolicy backOffPolicy() {
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(retryInterval);
        return backOffPolicy;
    }

    @Bean
    public RetryTemplate retryTemplate(){
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(retryPolicy());
        retryTemplate.setBackOffPolicy(backOffPolicy());
        return retryTemplate;
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(pollTimeout);
        factory.getContainerProperties().setSyncCommits(true);
        factory.setRetryTemplate(retryTemplate());
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setTransactionManager(transactionManager);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new ConcurrentHashMap<String, Object>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        return props;
    }

    @Bean(name = { "jsonConsumerFactory" })
    public ConsumerFactory<String, Object> jsonConsumerFactory() {
        Map<String, Object> props = new ConcurrentHashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean(name = { "kafkaJsonListenerContainerFactory" })
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaJsonListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<String, Object>();
        factory.setConsumerFactory(jsonConsumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(pollTimeout);
        factory.getContainerProperties().setSyncCommits(true);
        return factory;
    }

数据源配置

@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(basePackages = "com.sample.entity.repository")
public class DatasourceConfig {


    @Bean(name = "dataSourceProperties")
    @ConfigurationProperties("spring.datasource")
    public DataSourceProperties dataSourceProperties() {
        return new DataSourceProperties();
    }   
    @Bean(name = "datasource")
    @Primary
    public DataSource dataSource(@Qualifier("dataSourceProperties") DataSourceProperties properties) {
        return properties.initializeDataSourceBuilder().type(HikariDataSource.class)
                .build();
    }
    @Bean
    public LocalContainerEntityManagerFactoryBean entityManagerFactory(@Qualifier("datasource") DataSource ds) throws PropertyVetoException {
        LocalContainerEntityManagerFactoryBean entityManagerFactory = new LocalContainerEntityManagerFactoryBean();
        entityManagerFactory.setDataSource(ds);
        entityManagerFactory.setPackagesToScan(new String[]{"com.sample.entity.domain"});
        JpaVendorAdapter jpaVendorAdapter = new HibernateJpaVendorAdapter();
        entityManagerFactory.setJpaVendorAdapter(jpaVendorAdapter);
        return entityManagerFactory;
    }
    @Bean
    public DataSourceTransactionManager dsTransactionManager(@Qualifier("datasource") DataSource ds) {
        return new DataSourceTransactionManager(ds);
    }

    @Bean
    public PlatformTransactionManager transactionManager(EntityManagerFactory entityManagerFactory){
        return jpaTransactionManager(entityManagerFactory);
    }

    @Bean
    public JpaTransactionManager jpaTransactionManager(EntityManagerFactory entityManagerFactory){
        JpaTransactionManager transactionManager = new JpaTransactionManager();
        transactionManager.setEntityManagerFactory(entityManagerFactory);
        return transactionManager;
    }

    @Bean
    public JdbcTemplate jdbcTemplate(@Qualifier("datasource") DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }    
}

通过交易生成消息:

  @Autowired
    @Qualifier("stringKafkaTemplate")
    private KafkaTemplate<String, String> stringKafkaTemplate;

      @Autowired
     private EmployeeRepository employeeRepository;

@Override
@Transactional
public void create(List<Employee> employees){
        for (Employee emp : employees) {
              employeeRepository.save(emp);     
             String jsonStr = JsonUtil.toString(emp);
             stringKafkaTemplate.send("employee", jsonStr); 
        }
    }

接收者

    @KafkaListener(id = "employee", topics = "employee")
    @Transactional(readOnly = false)
    public void processRequest(@Payload String message) throws IOException {

     /// its working fine
    }

属性文件(Kafka 配置)

kafka.servers=localhost:9092
kafka.groupId=xyzabc
kafka.retry.maxAttempts=3
kafka.retry.interval=300000
kafka.concurrency=10
kafka.poll.timeout=1000

【问题讨论】:

    标签: spring-boot spring-data-jpa spring-kafka


    【解决方案1】:

    您的侦听器似乎正在接收 Employee 对象,而您的生产者正在创建它们 - 即您没有从侦听器调用 create()

    正如我昨天在对您的其他问题的评论中所说...

    如果您在侦听器容器线程上生成消息,则transactional.id&lt;prefix&gt;&lt;group&gt;.&lt;topic&gt;.&lt;partition&gt;。由于一个分区不能分配给多个实例,transactional.ids 将是唯一的。如果您在容器线程的上下文之外生成消息,transactional.id(因此前缀)在实例之间必须是唯一的。如果您两者都做,您将需要 2 个不同的生产者工厂。

    @Override
    @Transactional
    public void create(List<Employee> employees){
            for (Employee emp : employees) {
                  employeeRepository.save(emp);     
                 String jsonStr = JsonUtil.toString(emp);
                 stringKafkaTemplate.send("employee", jsonStr); 
            }
        }
    

    因此,由于您的事务仅在生产者端,因此您的 transactionIdPrefix 在每个实例上都必须是唯一的。

    【讨论】:

    • 谢谢@GaryRussell。现在它工作正常。但是我注意到的一件事是,如果有时连接到 Kafka 时出现问题,或者我手动删除了主题。那么它会在生产时抛出异常,这很好。但是如果有多个存储库保存/更新和多个存储库,则数据库回滚不会正确发生给卡夫卡的消息?链式 Kafka TM 代码有什么问题吗?
    • 我不知道;链接的 TM 只是委托给已配置的 TM。为 org.springframework.transactionorg.springframework.kafka.transaction 打开 TRACE 日志记录以查看发生了什么。
    猜你喜欢
    • 2015-04-20
    • 1970-01-01
    • 1970-01-01
    • 2021-11-02
    • 2020-02-09
    • 2019-01-23
    • 2018-08-03
    • 2021-11-10
    • 2018-09-23
    相关资源
    最近更新 更多