SpringBatch的基本使⽤
简介
A lightweight, comprehensive batch framework designed to enable the development of robust batch applications vital for the daily operations of enterprise systems.
Spring Batch provides reusable functions that are essential in processing large volumes of records, including logging/tracing, transaction management, job processing statistics, job restart, skip, and resource management. It also provides more advanced technical services and features that will enable extremely high-volume and high performance batch jobs through optimization and partitioning techniques.
Simple as well as complex, high-volume batch jobs can leverage the framework in a highly scalable manner to process significant volumes of information.
⼤体翻译如下:
⼀个轻量的、⼴泛的批处理框架,该框架的设计⽬的是为了⽀持对企业系统⽇常运营⾄关重要的批处理应⽤程序的开发。
Spring Batch 提供了在处理⼤量记录时必需的可复⽤功能,包括⽇志记录/跟踪、事务管理、任务处理统计、任务重启、任务跳过和资源管理。它也提供了更加⾼级的技术服务和特征,通过优化和分区的⽅式获得极⾼容量和⾼性能的批处理任务。简单和复杂的⼤容量批处理任务都可以以⾼度可扩展的⽅式利⽤该框架来处理⼤量的信息
处理架构
Spring Batch 的处理结构如下所⽰:
其中,任务的处理是在 Step 这个阶段定义的。在 Step 中,需要定义数据的读取、数据的处理、数据的写出操作,在这三个阶段中,数据的处理是真正进⾏数据处理的地⽅。具体 Step 的流程如下图所⽰:
Reader(架构图中的 Item Reader):主要的任务是定义数据的读取操作,包括读取⽂件的位置、对读取⾸先要进⾏的划分(如以 ',' 作为分隔符)、将读取到的⽂件映射到相关对象的属性字段等
Process(架构图中的 Item Processor):这⾥是真正对数据进⾏处理的地⽅,数据的处理逻辑都在这⾥定义
Writer(架构图中的 Item Writer):这个阶段的主要任务是定义数据的输出操作,包括将数据写⼊到数据库等
使⽤前的准备
在使⽤ Spring Batch 之前,⾸先需要创建 Spring Batch 需要的元数据表和它需要使⽤的元数据类型,这些可以在数据库中进⾏定义,这些元数据表和元数据类型是和 Spring Batch 中的域对象紧密相关的。
元数据表
元数据表的关联关系如下所⽰:
相关的表解释如下:
BATCH_JOB_INSTANCE:与这个表相对应的是 JobInstance 域对象,这个域对象是整个层次结构的顶层,表⽰具体的任务
BATCH_JOB_EXECUTION_PARAMS:与这个表对应的是 JobParameters 域对象,它包含了 0 个或多
个 key-value 键值对,作为每次运⾏任务时使⽤的参数,通过 JobParameters 对象和 Job 对象,可以得到唯⼀确定的JobInstance 实例
BATCH_JOB_EXECUTION:与这个表对应的是 JobExecution 域对象,每次运⾏⼀个任务时,都会创建⼀个新的 JobExecution对象
BATCH_STEP_EXECUTION:与这个表对应的是 StepExecution 对象,这个对象与 JobExecution 类似,与 JobExecution 相关联的地⽅在于⼀个 JobExecution 可以有多个 StepExecution
BATCH_JOB_EXECUTION_CONTEXT:这个表存储的是每个 Job 的执⾏上下⽂信息
BATCH_STEP_EXECUTION_CONTEXT:这个表存储的是每个 Job 中每个 Step 的执⾏上下⽂信息
元数据类型
BATCH_JOB_INSTANCE、BATCH_JOB_EXECUTION、BATCH_STEP_EXECUTION 这三个表都包含了以 _ID 结尾的列,这个列会作为它们所在表的实际主键。然⽽,这个列不是由数据库产⽣的,⽽是由单独的序列来产⽣的,这是因为:在插⼊数据之后,需要在插⼊的数据上设置给定的键,这样才能确保它们在 Java 应⽤中的唯⼀性。尽管较新的 JDBC ⽀持主键⾃增,但是为了能够更好地兼容,因此还是有必要为这三个数据表设置对应的序列类型。
定义元数据类型的 SQL 如下:
CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ;
CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ;
CREATE SEQUENCE BATCH_JOB_SEQ;
由于有的数据库(如MySQL)不⽀持 SEQUENCE 这种类型,⼀般的做法是创建⼀个表来代理 SEQUENCE:
CREATE TABLE BATCH_STEP_EXECUTION_SEQ (ID BIGINT NOT NULL) ENGINE = InnoDB;
INSERT INTO BATCH_STEP_EXECUTION_SEQ values(0);
CREATE TABLE BATCH_JOB_EXECUTION_SEQ (ID BIGINT NOT NULL) ENGINE =InnoDB;
INSERT INTO BATCH_JOB_EXECUTION_SEQ values(0);
CREATE TABLE BATCH_JOB_SEQ (ID BIGINT NOT NULL) ENGINE = InnoDB;
INSERT INTO BATCH_JOB_SEQ values(0);
最终创建元数据表和元数据类型的 SQL 脚本如下:
开始使⽤
在这个例⼦中,需要实现的功能为:从⼀个⽂件中批量地读取数据,将这些数据进⾏相应的转换(⼩写字母变⼤写),再将它们写⼊到数据库中。
对应的数据⽂本如下(sample-data.csv):
Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe
创建实体类
⾸先,创建处理的数据对应的实体和关联的数据表,如下所⽰:
// 与数据
public class Person {
private final String lastName;
private final String firstName;
public Person(String firstName, String lastName) {
this.firstName = firstName;
this.lastName = lastName;
}
// 省略⼀部分 getter ⽅法
}
对应的实体表如下所⽰:
-
- 以 PostgreSQL 为例
CREATE TABLE people
(
person_id  SERIAL8 NOT NULL PRIMARY KEY,
first_name VARCHAR(20),
last_name  VARCHAR(20)
);
处理的核⼼逻辑
接下来,需要定义对于每条数据的处理逻辑,处理逻辑对应的类需要实现 org.springframework.batch.item.ItemProcessor<I, O> 接⼝,其中,I 范型表⽰要处理的实体类类型,O表⽰经过处理之后返回的结果类型。由于这⾥只是对Peroson 类的名和姓进⾏⼤写的转换,因此输⼊类型和输出类型都是 Person
具体的处理逻辑如下:
import org.springframework.batch.item.ItemProcessor;
/*
处理数据的逻辑类,对应架构中的 Item Processor 部分
*/
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
@Override
public Person process(Person person) {
final String firstName = FirstName().toUpperCase();
final String lastName = LastName().toUpperCase();
// 可以在这个处理逻辑中定义⼀些其它的操作。。。。。。。。。
return new Person(firstName, lastName);
}
}
数据的读取
有了处理的核⼼逻辑部分之后,剩下的重要部分就是数据的输⼊和输出了,正如上⽂所描述的那样,输⼊部分定义了数据的来源、对初始数据进⾏处理等任务。
数据的读取部分如下所⽰:
@Bean(name = "reader") // 可以把这个 Bean 放在任意的⼀个配置类或组件类中
/*
由于读取的数据来源是来⾃⼀般的⽂件,因此采⽤ FlatFileItemReader 的实现类;
其它的实现类可以查看 org.springframework.batch.item.ItemReader 的实现类
*/
public FlatFileItemReader<Person> reader() {
return new FlatFileItemReaderBuilder<Person>() // 以构建者模式的⽅式构建新的 FlatFileItemReader 对象
.name("personItemReader")
.resource(new ClassPathResource("sample-data.csv")) // 读取数据的来源,这⾥表⽰在类路径的 resourcec ⽬录下的 sample-data.csv ⽂件
.delimited().delimiter(",") // 指定每⾏的数据字段的分割符为 ,
.names("firstName", "lastName") // 将分割的字段映射到 firstName 和 lastName 属性字段
.fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
setTargetType(Person.class);
}}) // 这些分割的属性字段对应的类,使⽤ {{}} 的⽅式来将初始化的 BeanWrapperFieldSetMapper 调⽤ setTargetType ⽅法,可能是⼀个⽐较简洁的⽅式,但这种⽅式可能会导致内存泄漏
.
build(); // 通过设置的属性构造 FlatFileItemReader 对象
}
数据的写出
数据的写出部分如下所⽰:
@Bean(name = "writer")
/*
由于这⾥的写出是写⼊的数据库中,因此采⽤ JdbcBatchItemWriter 的实现类进⾏写出;spring怎么读取配置
其它的实现类可以查看 org.springframework.batch.item.ItemWriter 的实现类
*/
public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Person>() // 以构建者模式的⽅式创建 JdbcBatchItemWriter 实例对象
.itemSqlParameterSourceProvider(
new BeanPropertyItemSqlParameterSourceProvider<Person>() // 提供执⾏相关 SQL 需要的参数,这⾥以实体类(输出类)的⽅式存储需要的参数
)
.sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)") // 写⼊数据库中具体执⾏的 SQL
.dataSource(dataSource) // 设置数据源,这个对象可以⼿动创建,但是⼀般在配置相关的 datasource 属性之后,Spring 会⾃动⽣成这个类型的 Bean
.build();
}
整合到 Step
上⽂介绍过,⼀个 Step 包含了数据的读取、数据的处理、数据的写出三个部分,⽽⼀个批处理任务可以由多个 Step来组成。现在,需要做的是将上⽂提到的写⼊、处理、写出三个部分组合成为⼀个 Step
具体代码如下所⽰:
@Bean(name = "step1") // Step 也是⼀个 Bean,这⾥将它命名为 step1
// Step 类是批处理任务的执⾏单元
public Step step1(JdbcBatchItemWriter<Person> writer) {
return stepBuilderFactory // 由 Spring 容器⾃⾏注⼊;注意这⾥使⽤的⼯⼚模式
.get("step1") // 创建⼀个会创建名为 step1 的 StepBuilder 对象;注意这⾥的构建者模式的使⽤
.<Person, Person>chunk(10) // 这个 Step ⼀次处理的数据的数量,前缀 <I, O> 范型表⽰的含义与 Item Process 中的⼀致,因此这⾥两个范型都设置为 Person
.reader(reader()) // 数据的读取部分
.processor(processor()) // 数据的处理部分
.writer(writer) // 写出部分,由于 writer 需要注⼊ DataSurce 对象,因此直接作为⼀个注⼊参数参数并使⽤会更加⽅便;当然,reader 和 process 也可以通过注⼊参数的⽅式直接使⽤,因为它们都被定义成了
Spring 中的 Bean            .build();
}
任务执⾏
如果想要在任务执⾏前或者任务执⾏之后做⼀些相关的处理操作,那么设置对应的任务执⾏会很有⽤。
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.BatchStatus;
import org.JobExecution;
import org.JobExecutionListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.JdbcTemplate;
import org.springframework.stereotype.Component;
@Component
public class JobCompletionNotificationListener implements JobExecutionListener {
// ⽇志打印对象
private final static Logger log = Logger(JobCompletionNotificationListener.class);
private final JdbcTemplate jdbcTemplate;
@Autowired
public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override // 执⾏任务之前的⼀些操作
public void beforeJob(JobExecution jobExecution) {
log.info("Ready to ");
}
@Override // 在任务执⾏完成之后执⾏的⼀些操作,这⾥是执⾏完成之后查询写⼊到数据库中的结果
public void afterJob(JobExecution jobExecution) {
if (Status() == BatchStatus.COMPLETED) {
log.info(" JOB FINISHED! Time to verify the results");
jdbcTemplate.query("SELECT first_name, last_name FROM people",
(rs, rowNum) -> new String(1), rs.getString(2))
).forEach(person -> log.info("Found <" + String() + "> in the database."));
}
}
}
创建 Job
批处理的最顶层的抽象便是 Job,Job 是⼀个批处理任务,现在整合上⽂的内容,创建⼀个 Job
@Bean(name = "importUserJob")
public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
return jobBuilderFactory // 这个⼯⼚对象是由 Spring ⾃动注⼊的;同样地,使⽤的是⼯⼚模式
.get("importUserJob") // 创建⼀个会创建 importUserJob 任务的 JobBuilder 对象;构建者模式
.incrementer(new RunIdIncrementer()) // 增加这个 Job 的参数信息,具体可以参见 Spring Batch 的元数据信息
.listener(listener) // 添加之前创建的任务执⾏,使得在任务开始和结束时执⾏相应的操作
.flow(step1) // 添加上⽂定义的 step1 处理
.end() // 任务结束
.build();
}
如果想要添加多个 Step,那么可以按照下⾯的⽅式进⾏添加:
@Bean(name = "importUserJob")
public Job importUserJob(JobCompletionNotificationListener listener) {
return jobBuilderFactory
.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.start(step1) // 定义的 step1
.
next(step2) // 定义的 step2
.build();
}
值得注意的是,由于上⽂定义的任务执⾏监听的是任务(即 Job)的状态,因此当添加多个 Step 时,只有在完成最后的 Step 之后才会触发这个事件监听。启动批处理任务
如果想要启动批处理任务,⾸先需要创建⼀个配置类,如下所⽰:
mport org.Job;
import org.Step;
import org.onfiguration.annotation.EnableBatchProcessing;
import org.onfiguration.annotation.JobBuilderFactory;
import org.launch.support.RunIdIncrementer;
import t.annotation.Bean;
import t.annotation.Configuration;
import javax.annotation.Resource;
@Configuration
@EnableBatchProcessing // 使得能够开启批处理任务处理,这样 JobBuilderFactory 才能够被 Spring 注⼊
public class TestConfiguration {
private final JobBuilderFactory jobBuilderFactory;
@Resource(name = "step1")
private Step step1;
@Resource(name = "step2")
private Step step2;
public TestConfiguration(JobBuilderFactory jobBuilderFactory) {
this.jobBuilderFactory = jobBuilderFactory;
}
@Bean(name = "importUserJob") // 具体的任务 Bean,这个 Bean 会在 Spring 容器启动的时候进⾏加载,因此任务也会执⾏
public Job importUserJob(JobCompletionNotificationListener listener) {
return jobBuilderFactory
.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.start(step1)
.next(step2)
.build();
}
}
除了在配置类中加上 @EnableBatchProcessing 开启批处理任务之外,在配置⽂件 l ⽂件中也需要做相应的配置:
spring:
batch:
job:
enabled: true # 使得能够开启批处理任务
现在,启动 Spring 应⽤程序(可以使⽤ SpringApplication.run() ⽅法来启动),你会发现正在进⾏批处理任务:
任务执⾏完成之后,查看数据库的写⼊内容:
可以发现,处理过的数据已经成功写⼊到数据库中了
查看执⾏的⽇志,可能如下所⽰:
以上就是有关 Spring Batch 的⼀些基本使⽤,希望它可以帮到你
参考:

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。