【问题标题】:How to workaround an InvalidIsolationLevelException in spring batch?如何解决春季批处理中的 InvalidIsolationLevelException?
【发布时间】:2019-10-25 19:36:03
【问题描述】:

我正在使用 spring 批处理,并且在执行批处理作业时收到 InvalidIsolationLevelException。这项工作在我使用 MapJobRepositoryFactoryBean 时有效,但我需要在数据库上生成 BATCH_* 表。

应用程序已经设置了事务管理器。

我的 spring 版本是 4.3.18.RELEASE 并使用 Oracle SQL 数据库。

这是我的批处理配置:

@Configuration
@ComponentScan({"com..."})
public class BatchConfig {
    private static final Logger LOGGER = LoggerFactory.getLogger(BatchConfig.class);

    private static final String REPORTS_STEP_NAME = "reportStep";
    private static final String REPORTS_JOB_NAME = "reportsJob";
    private static final int REPORTS_STEP_CHUNK_SIZE = 20;
    private static final String MAX_END_DATE = "2999-12-31 00:00:00";
    private static final String MAX_END_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
    private static final String END_DATE_PARAM = "endDate";
    private static final String REPORT_STATUS_PARAM = "reportSend";
    private static final YesNo NO = new YesNo("N");

    @Autowired
    private ApplicationConfig applicationConfig;

    @Autowired
    private ReportProcessor reportProcessor;

    @Autowired
    private ReportItemWriter reportItemWriter;

    @Bean
    public JobBuilderFactory jobBuilderFactory() throws Exception {
        JobBuilderFactory jobBuilderFactory = new JobBuilderFactory(jobRepository());
        return jobBuilderFactory;
    }

    @Bean
    public StepBuilderFactory stepBuilderFactory() throws Exception {
        StepBuilderFactory stepBuilderFactory = new StepBuilderFactory(jobRepository(), applicationConfig.transactionManager());
        return stepBuilderFactory;
    }

    @Bean
    public PlatformTransactionManager transactionManager() {
        JpaTransactionManager jpaTransactionManager = new JpaTransactionManager();
        jpaTransactionManager.setDataSource(dataSource());
        jpaTransactionManager.setEntityManagerFactory(applicationConfig.entityManagerFactory().getObject());
        return jpaTransactionManager;
    }

    @Bean
    public JobRegistry jobRegistry() {
        return new MapJobRegistry();
    }

    @Bean
    public JobExplorer jobExplorer() throws Exception {
        JobExplorerFactoryBean jobExplorerFactoryBean = new JobExplorerFactoryBean();
        jobExplorerFactoryBean.setDataSource(dataSource());
        jobExplorerFactoryBean.afterPropertiesSet();
        return jobExplorerFactoryBean.getObject();
    }

    @Bean
    public JobOperator jobOperator() throws Exception {
        SimpleJobOperator jobOperator = new SimpleJobOperator();
        jobOperator.setJobExplorer(jobExplorer());
        jobOperator.setJobRegistry(jobRegistry());
        jobOperator.setJobLauncher(jobLauncher());
        jobOperator.setJobRepository(jobRepository());
        return jobOperator;
    }

    @Bean
    public DataSource dataSource() {
        DriverManagerDataSource dataSource = new DriverManagerDataSource();
        dataSource.setDriverClassName("...");
        dataSource.setUrl("...");
        dataSource.setUsername("...");
        dataSource.setPassword("...");
        return dataSource;
    }

    @Bean
    public JobRepository jobRepository() throws Exception {
        JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
        jobRepositoryFactoryBean.setDataSource(dataSource());
        jobRepositoryFactoryBean.setTransactionManager(transactionManager());
        jobRepositoryFactoryBean.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
        return jobRepositoryFactoryBean.getObject();
    }

    @Bean
    public JobLauncher jobLauncher() throws Exception {
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
        simpleJobLauncher.setJobRepository(jobRepository());
        return simpleJobLauncher;
    }

    public JpaPagingItemReader<FishingVessel> readFishingVesselRecords() throws ParseException {
        LOGGER.info("Inside JpaPagingItemReader");
        JpaPagingItemReader<FishingVessel> fishingVesselsReader = new JpaPagingItemReader<>();
        fishingVesselsReader.setEntityManagerFactory(applicationConfig.entityManagerFactory().getObject());
        fishingVesselsReader.setQueryString(FishingVessel.FISHING_VESSEL_REPORTS);
        fishingVesselsReader.setTransacted(false);
        fishingVesselsReader.setSaveState(false);
        Map<String, Object> params = createQueryParams();
        fishingVesselsReader.setParameterValues(params);
        fishingVesselsReader.setPageSize(100);
        return fishingVesselsReader;
    }

    private Map<String, Object> createQueryParams() throws ParseException {
        Map<String, Object> params = new HashMap<>();
        DateFormatter dateFormatter = new DateFormatter(MAX_END_DATE_FORMAT);
        Date date = dateFormatter.parse(MAX_END_DATE, Locale.ENGLISH);
        params.put(END_DATE_PARAM, date);
        //params.put(REPORT_STATUS_PARAM, NO);
        return params;
    }

    @Bean
    public Step reportsStep() throws Exception {
        DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
        attribute.setPropagationBehavior(Propagation.REQUIRED.value());
        attribute.setIsolationLevel(Isolation.SERIALIZABLE.value());
        attribute.setTimeout(30);

        return stepBuilderFactory()
                .get(REPORTS_STEP_NAME)
                .<FishingVessel, FishingVessel>chunk(REPORTS_STEP_CHUNK_SIZE)
                .reader(readFishingVesselRecords())
                .processor(reportProcessor)
                .writer(reportItemWriter)
                .transactionAttribute(attribute)
                .build();
    }

    @Bean
    public Job reportsJob() throws Exception {
        return jobBuilderFactory()
                .get(REPORTS_JOB_NAME)
                .start(reportsStep())
                .build();
    }
}

这是应用程序中已经存在的应用程序配置:

@Configuration
@ComponentScan({
    "com....components",
    "com....services",
    "com....converters",
    "com....controllers",
    "com....fluxFolder",
    "com....processors",
    "com....flux.service",
    "com....flux.gateway"
})
@EnableJpaRepositories("com....repositories")
@EnableTransactionManagement
@EnableJpaAuditing
@EnableWebMvc
@Import({SecurityConfig.class})
@PropertySource("classpath:application.properties")
public class ApplicationConfig {
    @Bean
    public LocalContainerEntityManagerFactoryBean entityManagerFactory() {

        LocalContainerEntityManagerFactoryBean factory = new LocalContainerEntityManagerFactoryBean();
        factory.setPersistenceXmlLocation("classpath:/META-INF/persistence.xml");
        return factory;
    }

    @Bean
    @Primary
    public PlatformTransactionManager transactionManager() {
        JpaTransactionManager transactionManager = new JpaTransactionManager();
        transactionManager.setEntityManagerFactory(entityManagerFactory().getObject());
        return transactionManager;
    }

    @Bean
    public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() {
        return new PropertySourcesPlaceholderConfigurer();
    }

    @Bean
    public ViewResolver viewResolver() {
        UrlBasedViewResolver viewResolver = new UrlBasedViewResolver();
        viewResolver.setViewClass(JsfView.class);
        viewResolver.setSuffix(".xhtml");
        return viewResolver;
    }
}

这是我初始化批处理作业的控制器:

@RestController
public class BatchController {
    private static final String JOB_ID = "JobID";

    private JobExecution jobExecution;

    @Autowired
    private JobOperator jobOperator;

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job reportsJob;

    @Autowired
    private JobExplorer jobExplorer;

    @GetMapping("/batch/enable")
    public String batchTest() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
        JobParameters jobParameters = new JobParametersBuilder()
                .addString(JOB_ID, String.valueOf(System.currentTimeMillis()))
                .toJobParameters();
        JobInstance job;
        jobExecution = new JobExecution(jobLauncher.run(reportsJob, jobParameters));
        return "batch job started";
    }

    @GetMapping("/batch/disable")
    public String disable() {
        String jobName = jobExecution.getJobInstance().getJobName();
        Set<JobExecution> jobExecutions = jobExplorer.findRunningJobExecutions(jobName);
        for (JobExecution je : jobExecutions) {
            try {
                jobOperator.stop(je.getId());
            } catch (NoSuchJobExecutionException e) {

            } catch (JobExecutionNotRunningException e) {

            }
        }
        return jobName + " disabled.";
    }
}

这是我得到的堆栈跟踪:

14:01:30,945 错误 [io.undertow.request](默认任务 1)UT005023: 对 /.../batch/enable 的异常处理请求:org.springframework .web.util.NestedServletException:请求处理失败;嵌套的 例外是 org.springframework.transaction.InvalidIsolationLevelException: DefaultJpaDiale ct 不支持自定义隔离级别,因为 标准 JPA 的限制。可实施具体安排 在自定义 JpaDialect 变体中。 在 org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:982) 在 org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:861) 在 javax.servlet.http.HttpServlet.service(HttpServlet.java:687) 在 org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:846) 在 javax.servlet.http.HttpServlet.service(HttpServlet.java:790) 在 io.undertow.servlet.handlers.ServletHandler.handleRequest(ServletHandler.java:86) 在 io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(FilterHandler.java:130) 在 org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:197) 在 org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) 在 io.undertow.servlet.core.ManagedFilter.doFilter(ManagedFilter.java:60) 在 io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(FilterHandler.java:132) 在 org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:317) 在 org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:127) 在 org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:91) 在 org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331) 在 org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:114) 在 org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331) 在 org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:137) 在 org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331) 在 org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111) 在 org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331) 在 org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:170) 在 org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331) 在 org.springframework.security.web.savedrequest.RequestCacheAwareFilter.doFilter(RequestCacheAwareFilter.java:63) 在 org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331) 在 org.springframework.security.web.session.ConcurrentSessionFilter.doFilter(ConcurrentSessionFilter.java:155) 在 org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331) 在 org.springframework.security.web.authentication.AbstractAuthenticationProcessingFilter.doFilter(AbstractAuthenticationProcessingFilter.java:200) 在 org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331) 在 org.springframework.security.web.authentication.logout.LogoutFilter.doFilter(LogoutFilter.java:116) 在 org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331) 在 org.springframework.security.web.header.HeaderWriterFilter.doFilterInternal(HeaderWriterFilter.java:66) 在 org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) 在 org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331) 在 org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:105) 在 org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331) 在 org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:56) 在 org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) 在 org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331) 在 org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:214) 在 org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:177) 在 org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:347) 在 org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:263) 在 io.undertow.servlet.core.ManagedFilter.doFilter(ManagedFilter.java:60) 在 io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(FilterHandler.java:132) 在 io.undertow.servlet.handlers.FilterHandler.handleRequest(FilterHandler.java:85) 在 io.undertow.servlet.handlers.security.ServletSecurityRoleHandler.handleRequest(ServletSecurityRoleHandler.java:62) 在 io.undertow.servlet.handlers.ServletDispatchingHandler.handleRequest(ServletDispatchingHandler.java:36) 在 org.wildfly.extension.undertow.security.SecurityContextAssociationHandler.handleRequest(SecurityContextAssociationHandler.java:78) 在 io.undertow.server.handlers.PredicateHandler.handleRequest(PredicateHandler.java:43) 在 io.undertow.servlet.handlers.security.SSLInformationAssociationHandler.handleRequest(SSLInformationAssociationHandler.java:131) 在 io.undertow.servlet.handlers.security.ServletAuthenticationCallHandler.handleRequest(ServletAuthenticationCallHandler.java:57) 在 io.undertow.server.handlers.PredicateHandler.handleRequest(PredicateHandler.java:43) 在 io.undertow.security.handlers.AbstractConfidentialityHandler.handleRequest(AbstractConfidentialityHandler.java:46) 在 io.undertow.servlet.handlers.security.ServletConfidentialityConstraintHandler.handleRequest(ServletConfidentialityConstraintHandler.java:64) 在 io.undertow.security.handlers.AuthenticationMechanismsHandler.handleRequest(AuthenticationMechanismsHandler.java:58) 在 io.undertow.servlet.handlers.security.CachedAuthenticatedSessionHandler.handleRequest(CachedAuthenticatedSessionHandler.java:72) 在 io.undertow.security.handlers.NotificationReceiverHandler.handleRequest(NotificationReceiverHandler.java:50) 在 io.undertow.security.handlers.SecurityInitialHandler.handleRequest(SecurityInitialHandler.java:76) 在 io.undertow.server.handlers.PredicateHandler.handleRequest(PredicateHandler.java:43) 在 org.wildfly.extension.undertow.security.jacc.JACCContextIdHandler.handleRequest(JACCContextIdHandler.java:61) 在 io.undertow.server.handlers.PredicateHandler.handleRequest(PredicateHandler.java:43) 在 io.undertow.server.handlers.PredicateHandler.handleRequest(PredicateHandler.java:43) 在 io.undertow.servlet.handlers.ServletInitialHandler.handleFirstRequest(ServletInitialHandler.java:282) 在 io.undertow.servlet.handlers.ServletInitialHandler.dispatchRequest(ServletInitialHandler.java:261) 在 io.undertow.servlet.handlers.ServletInitialHandler.access$000(ServletInitialHandler.java:80) 在 io.undertow.servlet.handlers.ServletInitialHandler$1.handleRequest(ServletInitialHandler.java:172) 在 io.undertow.server.Connectors.executeRootHandler(Connectors.java:199) 在 io.undertow.server.HttpServerExchange$1.run(HttpServerExchange.java:774) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:748) 引起:org.springframework.transaction.InvalidIsolationLevelException: DefaultJpaDialect 不支持自定义隔离级别,因为 标准 JPA 的限制。可实施具体安排 在自定义 JpaDialect 变体中。 在 org.springframework.orm.jpa.DefaultJpaDialect.beginTransaction(DefaultJpaDialect.java:63) 在 org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:380) 在 org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:377) 在 org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:461) 在 org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:277) 在 org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96) 在 org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) 在 org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213) 在 com.sun.proxy.$Proxy419.getLastJobExecution(未知来源) 在 org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:98) 在 com....controllers.BatchController.batchTest(BatchController.java:42) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205) 在 org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:133) 在 org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:97) 在 org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827) 在 org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738) 在 org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85) 在 org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:967) 在 org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:901) 在 org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:970) ... 71 更多

【问题讨论】:

    标签: spring transactions spring-batch


    【解决方案1】:

    您可以将 JobRepositoryFactoryBean 中的隔离级别更改为默认值:

     @Bean
    public JobRepository jobRepository() throws Exception {
        JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
        jobRepositoryFactoryBean.setDataSource(dataSource());
        jobRepositoryFactoryBean.setTransactionManager(transactionManager());
        jobRepositoryFactoryBean.setIsolationLevelForCreate("ISOLATION_DEFAULT");
        return jobRepositoryFactoryBean.getObject();
    }
    

    【讨论】:

      猜你喜欢
      • 2014-12-15
      • 2018-05-27
      • 2015-05-13
      • 2017-06-25
      • 2017-10-06
      • 2021-06-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多