Spring Batch是处理大量数据操作的一个框架,主要用来读取大量数据,然后进行一定的处理后输出指定的形式。比如我们可以将csv文件中的数据(数据量几百万甚至几千万都是没问题的)批处理插入保存到数据库中,就可以使用该框架,但是不管是数据资料还是网上资料,我看到很少有这样的详细讲解。所以本片博文的主要目的边讲解的同时边实战(其中的代码都是经过实践的)。同样地先从Spring Boot对Batch框架的支持说起,最后一步一步进行代码实践!
一、Spring Boot对Batch框架的支持
1、Spring Batch框架的组成部分
1)JobRepository:用来注册Job容器,设置数据库相关属性。
2)JobLauncher:用来启动Job的接口
3)Job:我们要实际执行的任务,包含一个或多个
4)Step:即步骤,包括:ItemReader->ItemProcessor->ItemWriter
5)ItemReader:用来读取数据,做实体类与数据字段之间的映射。比如读取csv文件中的人员数据,之后对应实体person的字段做mapper
6)ItemProcessor:用来处理数据的接口,同时可以做数据校验(设置校验器,使用JSR-303(hibernate-validator)注解),比如将中文性别男/女,转为M/F。同时校验年龄字段是否符合要求等
7)ItemWriter:用来输出数据的接口,设置数据库源。编写预处理SQL插入语句
以上七个组成部分,只需要在配置类中逐一注册即可,同时配置类需要开启@EnableBatchProcessing注解
@Configuration @EnableBatchProcessing // 开启批处理的支持 @Import(DruidDBConfig.class) // 注入datasource public class CsvBatchConfig { }
2、批处理流程图
如下流程图即可以解释在配置类中为什么需要这么定义,具体请看实战部分的代码。
二、实战
1、添加依赖
1)spring batch依赖
<!-- spring batch --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency>
2)校验器依赖
<!-- hibernate validator --> <dependency> <groupId>org.hibernate</groupId> <artifactId>hibernate-validator</artifactId> <version>6.0.7.Final</version> </dependency>
3)mysql+druid依赖
<!-- mysql connector--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.35</version> </dependency> <!-- alibaba dataSource --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.12</version> </dependency>
4)test测试依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency>
2、application.yml配置
当job发布开始执行任务时,spring batch会自动生成相关的batch开头的表。这些表一开始是不存在的!需要在application配置文件中做相关的设置。
# batch
batch:
job:
# 默认自动执行定义的Job(true),改为false,需要jobLaucher.run执行
enabled: false
# spring batch在数据库里面创建默认的数据表,如果不是always则会提示相关表不存在
initialize-schema: always
# 设置batch表的前缀
# table-prefix: csv-batch
3、数据源配置
datasource: username: root password: 1234 url: jdbc:mysql://127.0.0.1:3306/db_base?useSSL=false&serverTimezone=UTC&characterEncoding=utf8 driver-class-name: com.mysql.jdbc.Driver
注册DBConfig配置类:之后通过import导入batch配置类中
/** * @author jian * @dete 2019/4/20 * @description 自定义DataSource * */ @Configuration public class DruidDBConfig { private Logger logger = LoggerFactory.getLogger(DruidDBConfig.class); @Value("${spring.datasource.url}") private String dbUrl; @Value("${spring.datasource.username}") private String username; @Value("${spring.datasource.password}") private String password; @Value("${spring.datasource.driver-class-name}") private String driverClassName; /* @Value("${spring.datasource.initialSize}") private int initialSize; @Value("${spring.datasource.minIdle}") private int minIdle; @Value("${spring.datasource.maxActive}") private int maxActive; @Value("${spring.datasource.maxWait}") private int maxWait; @Value("${spring.datasource.timeBetweenEvictionRunsMillis}") private int timeBetweenEvictionRunsMillis; @Value("${spring.datasource.minEvictableIdleTimeMillis}") private int minEvictableIdleTimeMillis; @Value("${spring.datasource.validationQuery}") private String validationQuery; @Value("${spring.datasource.testWhileIdle}") private boolean testWhileIdle; @Value("${spring.datasource.testOnBorrow}") private boolean testOnBorrow; @Value("${spring.datasource.testOnReturn}") private boolean testOnReturn; @Value("${spring.datasource.poolPreparedStatements}") private boolean poolPreparedStatements; @Value("${spring.datasource.maxPoolPreparedStatementPerConnectionSize}") private int maxPoolPreparedStatementPerConnectionSize; @Value("${spring.datasource.filters}") private String filters; @Value("{spring.datasource.connectionProperties}") private String connectionProperties;*/ @Bean @Primary // 被注入的优先级最高 public DataSource dataSource() { DruidDataSource dataSource = new DruidDataSource(); logger.info("-------->dataSource[url="+dbUrl+" ,username="+username+"]"); dataSource.setUrl(dbUrl); dataSource.setUsername(username); dataSource.setPassword(password); dataSource.setDriverClassName(driverClassName); /* //configuration datasource.setInitialSize(initialSize); datasource.setMinIdle(minIdle); datasource.setMaxActive(maxActive); datasource.setMaxWait(maxWait); datasource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis); datasource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis); datasource.setValidationQuery(validationQuery); datasource.setTestWhileIdle(testWhileIdle); datasource.setTestOnBorrow(testOnBorrow); datasource.setTestOnReturn(testOnReturn); datasource.setPoolPreparedStatements(poolPreparedStatements); datasource.setMaxPoolPreparedStatementPerConnectionSize(maxPoolPreparedStatementPerConnectionSize); try { datasource.setFilters(filters); } catch (SQLException e) { logger.error("druid configuration initialization filter", e); } datasource.setConnectionProperties(connectionProperties);*/ return dataSource; } @Bean public ServletRegistrationBean druidServletRegistrationBean() { ServletRegistrationBean servletRegistrationBean = new ServletRegistrationBean(); servletRegistrationBean.setServlet(new StatViewServlet()); servletRegistrationBean.addUrlMappings("/druid/*"); return servletRegistrationBean; } /** * 注册DruidFilter拦截 * * @return */ @Bean public FilterRegistrationBean duridFilterRegistrationBean() { FilterRegistrationBean filterRegistrationBean = new FilterRegistrationBean(); filterRegistrationBean.setFilter(new WebStatFilter()); Map<String, String> initParams = new HashMap<String, String>(); //设置忽略请求 initParams.put("exclusions", "*.js,*.gif,*.jpg,*.bmp,*.png,*.css,*.ico,/druid/*"); filterRegistrationBean.setInitParameters(initParams); filterRegistrationBean.addUrlPatterns("/*"); return filterRegistrationBean; } }