【问题标题】:Spring Batch thread-safe Map job repositorySpring Batch 线程安全的 Map 作业存储库
【发布时间】:2016-02-18 14:45:21
【问题描述】:

Spring Batch docs 表示 Map 支持的作业存储库:

请注意,内存存储库是易失性的,因此不允许在 JVM 实例之间重新启动。它也不能保证两个具有相同参数的作业实例同时启动,并且不适合在多线程作业或本地分区的 Step 中使用。因此,只要您需要这些功能,就可以使用存储库的数据库版本。

我想使用 Map 作业存储库,我不关心重新启动、防止并发作业执行等,但我确实关心能够使用多线程和本地分区。

我的批处理应用程序有一些分区步骤,乍一看,它似乎与 Map 支持的作业存储库一起运行得很好。

它说 MapJobRepositoryFactoryBean 不可能的原因是什么?查看 Map DAO 的实现,他们正在使用 ConcurrentHashMap。这不是线程安全的吗?

【问题讨论】:

    标签: multithreading spring-batch partitioning


    【解决方案1】:

    我建议您遵循文档,而不是依赖实施细节。即使这些映射是单独线程安全的,在更改中也可能存在竞争条件,而不是涉及多个这些映射。

    您可以非常轻松地使用内存数据库。示例

    @Grapes([
            @Grab('org.springframework:spring-jdbc:4.0.5.RELEASE'),
            @Grab('com.h2database:h2:1.3.175'),
            @Grab('org.springframework.batch:spring-batch-core:3.0.6.RELEASE'),
            // must be passed with -cp, for whatever reason the GroovyClassLoader
            // is not used for com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver
            //@Grab('org.codehaus.jettison:jettison:1.2'),
    ])
    import org.h2.jdbcx.JdbcDataSource
    import org.springframework.batch.core.Job
    import org.springframework.batch.core.JobParameters
    import org.springframework.batch.core.Step
    import org.springframework.batch.core.StepContribution
    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory
    import org.springframework.batch.core.launch.JobLauncher
    import org.springframework.batch.core.scope.context.ChunkContext
    import org.springframework.batch.core.step.tasklet.Tasklet
    import org.springframework.batch.repeat.RepeatStatus
    import org.springframework.beans.factory.annotation.Autowired
    import org.springframework.context.annotation.AnnotationConfigApplicationContext
    import org.springframework.context.annotation.Bean
    import org.springframework.context.annotation.Configuration
    import org.springframework.core.io.ResourceLoader
    import org.springframework.jdbc.datasource.init.DatabasePopulatorUtils
    import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator
    
    import javax.annotation.PostConstruct
    import javax.sql.DataSource
    
    @Configuration
    @EnableBatchProcessing
    class AppConfig {
    
        @Autowired
        private JobBuilderFactory jobs
    
        @Autowired
        private StepBuilderFactory steps
    
        @Bean
        public Job job() {
            return jobs.get("myJob").start(step1()).build()
        }
    
        @Bean
        Step step1() {
            this.steps.get('step1')
                .tasklet(new MyTasklet())
                .build()
        }
    
        @Bean
        DataSource dataSource() {
            new JdbcDataSource().with {
                url = 'jdbc:h2:mem:temp_db;DB_CLOSE_DELAY=-1'
                user = 'sa'
                password = 'sa'
                it
            }
        }
    
        @Bean
        BatchSchemaPopulator batchSchemaPopulator() {
            new BatchSchemaPopulator()
        }
    }
    
    class BatchSchemaPopulator {
        @Autowired
        ResourceLoader resourceLoader
    
        @Autowired
        DataSource dataSource
    
        @PostConstruct
        void init() {
            def populator = new ResourceDatabasePopulator()
            populator.addScript(
                    resourceLoader.getResource(
                            'classpath:/org/springframework/batch/core/schema-h2.sql'))
            DatabasePopulatorUtils.execute populator, dataSource
        }
    }
    
    class MyTasklet implements Tasklet {
    
        @Override
        RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
            println 'TEST!'
        }
    }
    
    def ctx = new AnnotationConfigApplicationContext(AppConfig)
    def launcher = ctx.getBean(JobLauncher)
    def jobExecution = launcher.run(ctx.getBean(Job), new JobParameters([:]))
    println "Status is: ${jobExecution.status}"
    

    【讨论】:

    • 好的。我在想 Map 可能会表现得更好,因为它不是事务性的,而内存数据库是(你知道这是不是真的吗?)。无论如何,就像你说的那样,最好谨慎行事。谢谢!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-09-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多