springboot执⾏批量插⼊_SpringBoot-SpringBatch批处理使⽤
样。。。
1,Spring Batch 介绍
Spring Batch是⼀个开源的、全⾯的、轻量级的批处理框架,通过Spring Batch可以实现强⼤的批处理应⽤程序的开发。
Spring Batch还提供记录/跟踪、事务管理、作业处理统计、作业重启以及资源管理等功能。
Spring Batch结合定时任务可以发挥更⼤的作⽤。
Spring Batch提供了 ItemReader、ItemProcessor 和 ItemWriter 来完成数据的读取、处理以及写出操作,并且可以将批处理的执⾏状态持久化到数据库中。
2,样例说明
(1)本⽂通过⼀个简单的数据复制样例演⽰在 Spring Boot 中如何使⽤Spring Batch。假设现在有⼀个data.csv ⽂件,⽂件中保存了 4 条⽤户数据,我们需要通过批处理框架读取⽂件中的内容然后将之插⼊数据表中。
注意:为⽅便演⽰,本次样例只实现了 ItemReader(数据的读取)和 Itemwriter(数据的写出),中间的 ItemProcessor(数据的处理)不实现,即数据读取后不做任何处理,直接⼊库。
(2)data.csv⽂件存放在 classpath 下(项⽬的 resources ⽂件夹⾥),⾥⾯内容如下(每⾏各个字段间使⽤空格隔开):
id username sex
1 张三 男
2 李四 男
3 王五 男
4 赵六 ⼥
(3)要存⼊的数据表(user)的表结构如下:
3,项⽬配置
(1)⾸先编辑项⽬的l⽂件,添加 spring-boot-starter-batch依赖以及数据库相关依赖。
注意:添加数据库相关依赖是为了将批处理的执⾏状态持久化到数据库中(可以理解为执⾏时⾃动⽣成的⼀些⽇志记录)。
org.springframework.boot
spring-boot-starter-batch
org.springframework.boot
spring-boot-starter-jdbc
mysql
mysql-connector-java
com.alibaba
druid
1.1.9
(2)接着在application.properties 中进⾏数据库基本信息配置:
# 前⾯4⾏是数据库基本配置
pe=com.alibaba.druid.pool.DruidDataSourcespring.datasource.url=jdbc:mysql://localhost:3306/hangge? serverTimezone=Asia/Shanghaispring.datasource.username=rootspring.datasource.password=hangge1234# 项⽬启动时创建数据表(⽤于记录批处理执⾏状态)的 SQL 脚本,该脚本由Spring Batch提供
spring.datasource.schema=classpath:/org/springframework/batch/core/schema-mysql.sql# 项⽬启动时执⾏建表
SQLspring.batch.initialize-schema=always# 默认情况下,项⽬启动时就会⾃动执⾏配置好的批处理操作。这⾥将其设为不⾃动执⾏,后⾯我们通过⼿动触发执⾏批处理spring.abled=false
(3)最后在项⽬启动类上添加@EnableBatchProcessing 注解开启 Spring Batch⽀持:
@SpringBootApplication
@EnableBatchProcessing
springboot结构public class DemoApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context
= SpringApplication.run(DemoApplication.class, args);
}
}
4,批处理配置
(1)创建 CsvBatchJobConfig 进⾏ Spring Batch 配置,代码如下:
(1)Spring Batch 提供了⼀些常⽤的 ItemReader(即数据的读取逻辑),例如:
JdbcPagingltemReader ⽤来读取数据库中的数据
StaxEventltemReader ⽤来读取 XML 数据
FlatFileltemReader ⽤来加载普通⽂件(本样例使⽤该 ItemReader)
(2)Spring Batch提供了⼀些常⽤的 ItemWriter(即数据的写出逻辑),例如:
FlatFileltemWriter 表⽰将数据写出为⼀个普通⽂件。
StaxEvenltemWriter 表⽰将数据写出为 XML。
还有针对不同数据库提供的写出操作⽀持类,如 MongoltemWriter、JpaltemWriter、Neo4jItemWriter 以及 HibernateltemWriter 等。
本案例使⽤的 JdbcBatchltemWriter 则是通过 JDBC 将数据写出到⼀个关系型数据库中。
@Configuration
public class CsvBatchJobConfig {
// 注⼊JobBuilderFactory,⽤来构建Job
@Autowired
JobBuilderFactory jobBuilderFactory;
// 注⼊StepBuilderFactory,⽤来构建Step
@Autowired
StepBuilderFactory stepBuilderFactory;
// 注⼊DataSource,⽤来⽀持持久化操作,这⾥持久化⽅案是Spring-Jdbc
@Autowired
DataSource dataSource;
// 配置⼀个ItemReader,即数据的读取逻辑
@Bean
@StepScope
FlatFileItemReader itemReader() {
// FlatFileItemReader 是⼀个加载普通⽂件的 ItemReader
FlatFileItemReader reader = new FlatFileItemReader<>();
/
/ 由于data.csv⽂件第⼀⾏是标题,因此通过setLinesToSkip⽅法设置跳过⼀⾏reader.setLinesToSkip(1);
// setResource⽅法配置data.csv⽂件的位置
reader.setResource(new ClassPathResource("data.csv"));
// 通过setLineMapper⽅法设置每⼀⾏的数据信息
reader.setLineMapper(new DefaultLineMapper(){{
setLineTokenizer(new DelimitedLineTokenizer(){{
// setNames⽅法配置了data.csv⽂件⼀共有4列,分别是id、username、以及sex, setNames("id","username","sex");
// 配置列与列之间的间隔符(这⾥是空格)
setDelimiter(" ");
}});
setFieldSetMapper(new BeanWrapperFieldSetMapper(){{
setTargetType(User.class);
}});
}});
return reader;
}
// 配置ItemWriter,即数据的写出逻辑
@Bean
JdbcBatchItemWriter jdbcBatchItemWriter() {
// 使⽤的JdbcBatchltemWriter则是通过JDBC将数据写出到⼀个关系型数据库中。JdbcBatchItemWriter writer = new JdbcBatchItemWriter();
// 配置使⽤的数据源
writer.setDataSource(dataSource);
// 配置数据插⼊SQL,注意占位符的写法是":属性名"
writer.setSql("insert into user(id,username,sex) " +
"values(:id,:username,:sex)");
// 最后通过BeanPropertyItemSqlParameterSourceProvider实例将实体类的属性和SQL中的占位符⼀⼀映射writer.setItemSqlParameterSourceProvider(
new BeanPropertyItemSqlParameterSourceProvider<>());
return writer;
}
// 配置⼀个Step
@Bean
Step csvStep() {
// Step通过stepBuilderFactory进⾏配置
("csvStep") //通过get获取⼀个StepBuilder,参数数Step的name
.chunk(2) //⽅法的参数2,表⽰每读取到两条数据就执⾏⼀次write操作
.reader(itemReader()) // 配置reader
.writer(jdbcBatchItemWriter()) // 配置writer
.build();
}
// 配置⼀个Job
@Bean
Job csvJob() {
// 通过jobBuilderFactory构建⼀个Job,get⽅法参数为Job的name
("csvJob")
.start(csvStep()) // 配置该Job的Step
.build();
}
}
(2)这⾥还涉及到了⼀个 User 实体类,代码如下:
@NoArgsConstructor
@Setter
@Getter
public class User {
private Integer id;
private String username;
private String sex;
}
5,创建 Controller
接下来创建 Controller,当⽤户发起⼀个请求是触发批处理:
@RestController
public class HelloController {
// JobLauncher 由框架提供
@Autowired
JobLauncher jobLauncher;
// Job 为刚刚配置的
@Autowired
Job job;
@GetMapping("/hello")
public void hello() {
try {
JobParameters jobParameters = new JobParametersBuilder()
.toJobParameters();
// 通过调⽤ JobLauncher 中的 run ⽅法启动⼀个批处理
jobLauncher.run(job, jobParameters);
} catch (Exception e) {
e.printStackTrace();
}
}
}
6,运⾏测试
(2)同时我们发现数据库中还会⾃动创建了多个批处理相关的表,这些表⽤来记录批处理的执⾏状态。
注意:我们如果再次调⽤ Controller 接⼝触发批处理,数据也不会重复插⼊(即使 data.csv ⾥⾯内容改变了),因为这个批处理任务之前已经被记录为执⾏成功,所以不会再次执⾏。除⾮把 Job 名字改成其他的,⽐如“csvJob2” 。
附:增加 ItemProcessor 数据处理
在实际应⽤中,我们读取完数据后可能还需要先对数据进⾏⼀些处理,然后再⼊库。下⾯演⽰如何实现这个功能。
1,⾃定义 ItemProcessor
⾃定义处理器只需要实现 ItemProcessor 接⼝,重写 process ⽅法,输⼊的参数是从 ItemReader 读取到的数据,返回的数据给ItemWriter。这⾥我们将读取到对性别“男/⼥”转换成英⽂“M/F”
public class CvsItemProcessor implements ItemProcessor {
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论