【问题标题】:Spring Batch CompositeItemWriter Transaction Roll back issueSpring Batch CompositeItemWriter 事务回滚问题
【发布时间】:2015-11-09 06:58:36
【问题描述】:

所有,我在使用 Spring Batch CompositeItemWriter 时遇到事务回滚问题。 Spring批处理的版本是2.2.7。

我的配置与此处的帖子相同:https://stackoverflow.com/questions/32432519/compositeitemwriter-doesnt-roll-back-in-spring-batch

<bean id="compositeItemWriter" class="org.springframework.batch.item.support.CompositeItemWriter">
    <property name="delegates">
    <list>
        <ref bean="firstItemWriter"/>
        <ref bean="secondItemWriter"/>
        <ref bean="thirdItemWriter"/>
    </list>
    </property>
</bean>

<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager" >
    <property name="dataSource" ref="dataSource"/>
</bean> 

<bean id="dataSource"
    class="org.apache.commons.dbcp.BasicDataSource">
    <property name="driverClassName" value="${batch.jdbc.driver}" />
    <property name="url" value="${batch.jdbc.url}" />
    <property name="username" value="${batch.jdbc.user}" />
    <property name="password" value="${batch.jdbc.password}" />
</bean>

当 thirdItemWriter 抛出异常时,以前的 writer 写入数据库的数据不会回滚。

不确定我错过了什么,但我的理解是复合项目编写者共享相同的事务,所有数据都应该在发生异常时回滚。

欢迎提出任何建议

【问题讨论】:

    标签: java spring spring-data


    【解决方案1】:

    经过几天的努力,终于有了一个解决方案。关键是将所有写入者放在一个事务中并关闭自动提交。

    public class TransactionAwareCompositeItemWritter<T> extends CompositeItemWriter<T> {
    private static Logger LOG = LoggerFactory.getLogger(TransactionAwareCompositeItemWritter.class);
    
    private List<ItemWriter<? super T>> delegates;
    
    private PlatformTransactionManager transactionManager;
    
    private JdbcTemplate jdbcTemplate;
    
    @Override
    public void write(final List<? extends T> item) throws Exception {
    
        Connection con = DataSourceUtils.getConnection(jdbcTemplate.getDataSource());
    
        if (con.getAutoCommit()) {
    
            LOG.debug("Switching JDBC Connection [" + con + "] to manual commit");
    
            con.setAutoCommit(false);
        }
    
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        def.setName("CompositeItemWriter Transaction");
        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
    
        TransactionStatus status = transactionManager.getTransaction(def);
        try {
            for (ItemWriter<? super T> writer : delegates) {
                writer.write(item);
            }
        } catch (Exception e) {
            transactionManager.rollback(status);
            throw e;
        }
    
        transactionManager.commit(status);
    
        if (!con.getAutoCommit()) {
    
            LOG.debug("Switching JDBC Connection [" + con + "] to auto commit");
    
            con.setAutoCommit(true);
        }
    
    }
    
    public void setDelegates(List<ItemWriter<? super T>> delegates) {
        super.setDelegates(delegates);
        this.delegates = delegates;
    }
    
    public PlatformTransactionManager getTransactionManager() {
        return transactionManager;
    }
    
    public void setTransactionManager(PlatformTransactionManager transactionManager) {
        this.transactionManager = transactionManager;
    }
    
    public JdbcTemplate getJdbcTemplate() {
        return jdbcTemplate;
    }
    
    public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }
    

    }

    【讨论】:

    【解决方案2】:

    制作一个自定义的compositeItemWrite。 '@Transactional' 适用于我的代码。

    package com.krafton.intraplatform.batch.job.step;
    
    import java.util.Iterator;
    import java.util.List;
    
    import org.springframework.batch.item.ExecutionContext;
    import org.springframework.batch.item.ItemStream;
    import org.springframework.batch.item.ItemStreamException;
    import org.springframework.batch.item.ItemStreamWriter;
    import org.springframework.batch.item.ItemWriter;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.transaction.annotation.Transactional;
    import org.springframework.util.Assert;
    
    public class CustomCompositeWriter<T> implements ItemStreamWriter<T>, InitializingBean {
      private List<ItemWriter<? super T>> delegates;
      private boolean ignoreItemStream = false;
    
      public CustomCompositeWriter() {
      }
    
      public void setIgnoreItemStream(boolean ignoreItemStream) {
        this.ignoreItemStream = ignoreItemStream;
      }
    
    
      @Transactional
      public void write(List<? extends T> item) throws Exception {
        Iterator var2 = this.delegates.iterator();
    
        while(var2.hasNext()) {
          ItemWriter<? super T> writer = (ItemWriter)var2.next();
          writer.write(item);
        }
    
      }
    
      public void afterPropertiesSet() throws Exception {
        Assert.notNull(this.delegates, "The 'delegates' may not be null");
        Assert.notEmpty(this.delegates, "The 'delegates' may not be empty");
      }
    
      public void setDelegates(List<ItemWriter<? super T>> delegates) {
        this.delegates = delegates;
      }
    
      public void close() throws ItemStreamException {
        Iterator var1 = this.delegates.iterator();
    
        while(var1.hasNext()) {
          ItemWriter<? super T> writer = (ItemWriter)var1.next();
          if (!this.ignoreItemStream && writer instanceof ItemStream) {
            ((ItemStream)writer).close();
          }
        }
    
      }
    
      public void open(ExecutionContext executionContext) throws ItemStreamException {
        Iterator var2 = this.delegates.iterator();
    
        while(var2.hasNext()) {
          ItemWriter<? super T> writer = (ItemWriter)var2.next();
          if (!this.ignoreItemStream && writer instanceof ItemStream) {
            ((ItemStream)writer).open(executionContext);
          }
        }
    
      }
    
      public void update(ExecutionContext executionContext) throws ItemStreamException {
        Iterator var2 = this.delegates.iterator();
    
        while(var2.hasNext()) {
          ItemWriter<? super T> writer = (ItemWriter)var2.next();
          if (!this.ignoreItemStream && writer instanceof ItemStream) {
            ((ItemStream)writer).update(executionContext);
          }
        }
    
      }
    }
    

    【讨论】:

    猜你喜欢
    • 2019-01-25
    • 2011-10-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-03-02
    • 2011-11-21
    • 2020-08-06
    • 1970-01-01
    相关资源
    最近更新 更多