【问题标题】:spring-boot-starter-jta-atomikos and spring-boot-starter-batchspring-boot-starter-jta-atomikos 和 spring-boot-starter-batch
【发布时间】:2015-07-07 09:46:51
【问题描述】:

是否可以在一个应用程序中同时使用这两个启动器?

我想将 CSV 文件中的记录加载到数据库表中。 Spring Batch 表存储在不同的数据库中,所以我假设我需要使用 JTA 来处理事务。

每当我将@EnableBatchProcessing 添加到我的@Configuration 类时,它都会配置一个PlatformTransactionManager,从而阻止Atomikos 自动配置。

是否有任何 spring boot + batch + jta 示例可以展示如何做到这一点?

非常感谢, 詹姆斯

【问题讨论】:

    标签: spring-boot spring-batch spring-data-jpa jta atomikos


    【解决方案1】:

    我找到了一个解决方案,我能够保留 @EnableBatchProcessing 但必须实现 BatchConfigurer 和 atomikos bean,请参阅此 so answer 中的完整答案。

    【讨论】:

      【解决方案2】:

      我刚刚经历了这个,我发现了一些似乎有效的东西。正如您所注意到的,@EnableBatchProcessing 会导致创建 DataSourceTransactionManager,这会搞砸一切。我在@EnableBatchProcessing 中使用了modules=true,所以ModularBatchConfiguration 类被激活了。

      我所做的是停止使用@EnableBatchProcessing,而是将整个ModularBatchConfiguration 类复制到我的项目中。然后我注释掉了transactionManager() 方法,因为Atomikos 配置创建了JtaTransactionManager。我还必须重写jobRepository() 方法,因为它被硬编码为使用在DefaultBatchConfiguration 中创建的DataSourceTransactionManager

      我还必须显式导入 JtaAutoConfiguration 类。这会正确连接所有内容(根据执行器的“bean”端点 - 感谢上帝)。但是当你运行它时,事务管理器会抛出一个异常,因为某处设置了明确的事务隔离级别。于是我也写了BeanPostProcessor 找到事务管理器,调用txnMgr.setAllowCustomIsolationLevels(true)

      现在一切正常,但是在作业运行时,我无法使用JdbcTemplate 从 batch_step_execution 表中获取当前数据,即使我可以在 SQLYog 中看到数据。这肯定和事务隔离有关,但是我还没有理解。

      这是我的配置类,从 Spring 复制并如上所述进行了修改。 PS,我有我的DataSource,它指向带有注释为@Primary 的批处理表的数据库。另外,我将DataSource bean 更改为org.apache.tomcat.jdbc.pool.XADataSource 的实例;我不确定这是否有必要。

      @Configuration
      @Import(ScopeConfiguration.class)
      public class ModularJtaBatchConfiguration implements ImportAware 
      {
          @Autowired(required = false)
          private Collection<DataSource> dataSources;
      
          private BatchConfigurer configurer;
      
          @Autowired
          private ApplicationContext context;
      
          @Autowired(required = false)
          private Collection<BatchConfigurer> configurers;
      
          private AutomaticJobRegistrar registrar = new AutomaticJobRegistrar();
      
          @Bean
          public JobRepository jobRepository(DataSource batchDataSource, JtaTransactionManager jtaTransactionManager) throws Exception 
          {
              JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
              factory.setDataSource(batchDataSource);
              factory.setTransactionManager(jtaTransactionManager);
              factory.afterPropertiesSet();
              return  factory.getObject();
          }
      
          @Bean
          public JobLauncher jobLauncher() throws Exception {
              return getConfigurer(configurers).getJobLauncher();
          }
      
      //  @Bean
      //  public PlatformTransactionManager transactionManager() throws Exception {
      //      return getConfigurer(configurers).getTransactionManager();
      //  }
      
          @Bean
          public JobExplorer jobExplorer() throws Exception {
              return getConfigurer(configurers).getJobExplorer();
          }
      
          @Bean
          public AutomaticJobRegistrar jobRegistrar() throws Exception {
              registrar.setJobLoader(new DefaultJobLoader(jobRegistry()));
              for (ApplicationContextFactory factory : context.getBeansOfType(ApplicationContextFactory.class).values()) {
                  registrar.addApplicationContextFactory(factory);
              }
              return registrar;
          }
      
          @Bean
          public JobBuilderFactory jobBuilders(JobRepository jobRepository) throws Exception {
              return new JobBuilderFactory(jobRepository);
          }
      
          @Bean
          // hopefully this will autowire the Atomikos JTA txn manager
          public StepBuilderFactory stepBuilders(JobRepository jobRepository, JtaTransactionManager ptm) throws Exception {
              return new StepBuilderFactory(jobRepository, ptm);
          }
      
          @Bean
          public JobRegistry jobRegistry() throws Exception {
              return new MapJobRegistry();
          }
      
          @Override
          public void setImportMetadata(AnnotationMetadata importMetadata) {
              AnnotationAttributes enabled = AnnotationAttributes.fromMap(importMetadata.getAnnotationAttributes(
                      EnableBatchProcessing.class.getName(), false));
              Assert.notNull(enabled,
                      "@EnableBatchProcessing is not present on importing class " + importMetadata.getClassName());
          }
      
          protected BatchConfigurer getConfigurer(Collection<BatchConfigurer> configurers) throws Exception {
              if (this.configurer != null) {
                  return this.configurer;
              }
              if (configurers == null || configurers.isEmpty()) {
                  if (dataSources == null || dataSources.isEmpty()) {
                      throw new UnsupportedOperationException("You are screwed");
                  } else if(dataSources != null && dataSources.size() == 1) {
                      DataSource dataSource = dataSources.iterator().next();
                      DefaultBatchConfigurer configurer = new DefaultBatchConfigurer(dataSource);
                      configurer.initialize();
                      this.configurer = configurer;
                      return configurer;
                  } else {
                      throw new IllegalStateException("To use the default BatchConfigurer the context must contain no more than" +
                                                              "one DataSource, found " + dataSources.size());
                  }
              }
              if (configurers.size() > 1) {
                  throw new IllegalStateException(
                          "To use a custom BatchConfigurer the context must contain precisely one, found "
                                  + configurers.size());
              }
              this.configurer = configurers.iterator().next();
              return this.configurer;
          }
      
      }
      
      @Configuration
      class ScopeConfiguration {
      
          private StepScope stepScope = new StepScope();
      
          private JobScope jobScope = new JobScope();
      
          @Bean
          public StepScope stepScope() {
              stepScope.setAutoProxy(false);
              return stepScope;
          }
      
          @Bean
          public JobScope jobScope() {
              jobScope.setAutoProxy(false);
              return jobScope;
          }
      
      }
      

      【讨论】:

      • 最后,即使这对我来说也没有成功。如果不让 Atomikos JTA Txn Mgr 发疯并锁定并杀死我所有的工作,我就无法查询数据库。然后我意识到我的第二个数据源对于单个作业是只读的,所以我将所有配置恢复为标准的非 JTA 配置,完全取出 Atomikos,并使用 autoCommit= 创建第二个只读数据源作为 Tomcat 数据源池 bean true 并且仅在启动该特定作业时创建它。
      猜你喜欢
      • 1970-01-01
      • 2014-10-30
      • 2014-04-07
      • 2016-01-29
      • 1970-01-01
      • 2017-02-19
      • 2023-04-04
      • 2019-10-28
      • 2019-08-23
      相关资源
      最近更新 更多