pringBoot系列 - 批处理
本文最后更新于 2024-03-23,欢迎来到我的Blog! https://www.zpeng.site/
SpringBoot系列 - 批处理
Spring Batch是一个轻量级的框架,完全面向Spring的批处理框架,用于企业级大量的数据读写处理系统。以POJO和Spring 框架为基础, 包括日志记录/跟踪,事务管理、 作业处理统计工作重新启动、跳过、资源管理等功能。
Spring Batch官网是这样介绍的自己:一款轻量的、全面的批处理框架,用于开发强大的日常运营的企业级批处理应用程序。
框架主要有以下功能:
Transaction management(事务管理)
Chunk based processing(基于块的处理)
Declarative I/O(声明式的输入输出)
Start/Stop/Restart(启动/停止/再启动)
Retry/Skip(重试/跳过)
如果你的批处理程序需要使用上面的功能,那就大胆地使用它吧。
框架介绍
框架一共有5个主要角色:
JobRepository是用户注册Job的容器,就是存储数据的地方,可以看做是一个数据库的接口,在任务执行的时候需要通过它来记录任务状态等等信息。
JobLauncher是任务启动器,通过它来启动任务,可以看做是程序的入口。
Job代表着一个具体的任务。
Step代表着一个具体的步骤,一个Job可以包含多个Step(想象把大象放进冰箱这个任务需要多少个步骤你就明白了)。
Item就是输出->处理->输出,一个完整Step流程。
下面简要的介绍一下这5个角色
JobRepository
JobRepository用于存储任务执行的状态信息,比如什么时间点执行了什么任务、任务执行结果如何等等。 框架提供了2种实现,一种是通过Map形式保存在内存中,当Java程序重启后任务信息也就丢失了, 并且在分布式下无法获取其他节点的任务执行情况;另一种是保存在数据库中,并且将数据保存在下面6张表里:
BATCH_JOB_INSTANCE
BATCH_JOB_EXECUTION_PARAMS
BATCH_JOB_EXECUTION
BATCH_STEP_EXECUTION
BATCH_JOB_EXECUTION_CONTEXT
BATCH_STEP_EXECUTION_CONTEXT
Spring Batch框架的JobRepository支持主流的数据库:DB2、Derby、H2、HSQLDB、MySQL、Oracle、PostgreSQL、SQLServer、Sybase。
JobLauncher
JobLauncher是任务启动器,该接口只有一个run方法:
public interface JobLauncher {
JobExecution run(Job job, JobParameters jobParameters);
}
除了传入Job对象之外,还需要传入JobParameters对象,后续讲到Job再解释为什么要多传一个JobParameters。 通过JobLauncher可以在Java程序中调用批处理任务,也可以通过命令行或者其他框架 (如定时调度框架Quartz、Web后台框架Spring MVC)中调用批处理任务。 Spring Batch框架提供了一个JobLauncher的实现类SimpleJobLauncher。
Job
Job代表着一个任务,一个Job与一个或者多个JobInstance相关联,而一个JobInstance又与一个或者多个JobExecution相关联:
考虑到任务可能不是只执行一次就再也不执行了,更多的情况可能是定时任务,如每天执行一次,每个星期执行一次等等, 那么为了区分每次执行的任务,框架使用了JobInstance。如上图所示,Job是一个EndOfDay(每天最后时刻执行的任务), 那么其中一个JobInstance就代表着2007年5月5日那天执行的任务实例。 框架通过在执行JobLauncher.run(Job, JobParameters)方法时传入的JobParameters来区分是哪一天的任务。
由于2007年5月5日那天执行的任务可能不会一次就执行完成,比如中途被停止,或者出现异常导致中断, 需要多执行几次才能完成,所以框架使用了JobExecution来表示每次执行的任务。
Step
一个Job任务可以分为几个Step步骤,与JobExection相同,每次执行Step的时候使用StepExecution来表示执行的步骤。 每一个Step还包含着一个ItemReader、ItemProcessor、ItemWriter,下面分别介绍这三者。
ItemReader
ItemReader代表着读操作,其接口如下:
public interface ItemReader<T> {
T read();
}
框架已经提供了多种ItemReader接口的实现类,包括对文本文件、CSV文件、XML文件、数据库、JMS消息等读的处理,当然我们也可以自己实现该接口。
ItemProcessor
ItemReader代表着处理操作,其接口如下:
public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}
process方法的形参传入I类型的对象,通过处理后返回O型的对象。开发者可以实现自己的业务代码来对数据进行处理。
ItemWriter
ItemReader代表着写操作,其接口如下:
public interface ItemWriter<T> {
void write(List<? extends T> items) throws Exception;
}
框架已经提供了多种ItemWriter接口的实现类,包括对文本文件、CSV文件、XML文件、数据库、JMS消息等写的处理,当然我们也可以自己实现该接口。
Job监听器
还可以自定义任务监听器,在任务启动和完成之后进行相应的通知和响应。
/**
* 监听器实现JobExecutionListener接口,并重写其beforeJob,afterJob方法即可
*/
public class MyJobListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
}
@Override
public void afterJob(JobExecution jobExecution) {
}
}
数据校验
我们可以JSR-303(主要实现由hibernate-validator)的注解,来校验ItemReader读取到的数据是否满足要求。
首先让我们的ItemProcessor实现ValidatingItemProcessor接口:
public class MyItemProcessor extends ValidatingItemProcessor<User> {
@Override
public User process(User item) throws ValidationException {
super.process(item);
return item;
}
}
然后定义自己的校验器,实现的Validator接口来自于Spring,我们将使用JSR-303的Validator来校验:
public class MyBeanValidator<T> implements Validator<T>,InitializingBean {
private Validator validator;
@Override
public void afterPropertiesSet() throws Exception {
}
@Override
public void validate(T value)throws ValidationException{
}
}
在定义我们的MyItemProcessor时必须将MyBeanValidator设置进去,代码如下:
@Bean
public ItemProcessor<User,User> processor(){
//新建ItemProcessor接口的实现类返回
MyItemProcessor processor = new MyItemProcessor();
processor.setValidator(myBeanValidator());
return processor;
}
@Bean
public Validator<User> myBeanValidator(){
return new MyBeanValidator<User>();
}
SpringBoot集成实例
Spring Boot对Spring Batch支持的源码位于org.springframework.boot.autoconfigure.batch
下。
Spring Boot为我们自动初始化了Spring Batch存储批处理记录的数据库,且当我们程序启动时, 会自动执行我们定义的Job的Bean,不过我们可以通过配置定时器或手动触发方式启动。
Spring Boot提供如下属性来定制Spring Batch:
#启动时要执行的job,默认执行全部job
spring.batch.job.name=job1,job2
#是否自动执行定义的job,默认是
spring.batch.job.enabled=true
#是否初始化Spring Batch的数据库,默认为是
spring.batch.initializer.enabled=true
spring.batch.schema=
#设置Spring Batch的数据库表的前缀
spring.batch.table-prefix=
下面通过一个真实的例子,需要将客户导过来的csv文件导入到我们的业务Oracle数据库中,来说明怎样在SpringBoot中使用批处理框架。
添加maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<exclusions>
<exclusion>
<groupId>org.hsqldb</groupId>
<artifactId>hsqldb</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--添加hibernate-validator依赖,作为数据校验使用-->
<dependency>
<groupId>org.hibernate.validator</groupId>
<artifactId>hibernate-validator</artifactId>
<version>6.0.7.Final</version>
</dependency>
<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
<version>2.0.1.Final</version>
</dependency>
<dependency>
<groupId>javax.el</groupId>
<artifactId>javax.el-api</artifactId>
<version>3.0.1-b04</version>
</dependency>
<dependency>
<groupId>org.glassfish.web</groupId>
<artifactId>javax.el</artifactId>
<version>2.2.6</version>
</dependency>
<!--添加hibernate-validator依赖 END-->
<!-- Oracle驱动包 -->
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc6</artifactId>
<version>11.2.0.4.0-atlassian-hosted</version>
</dependency>
配置application.yml
################### spring配置 ###################
spring:
profiles:
active: dev
batch:
job:
enabled: false
datasource:
driver-class-name: oracle.jdbc.driver.OracleDriver
url: jdbc:oracle:thin:@xxx.xxx.xxx.xxx:1521:orcl11g
username: adm_real
password: adm_real
真实csv数据,位于src/main/resources/NT_BSC_BUDGETVTOLL.csv
中
表定义如下:
CREATE TABLE "ADM_REAL"."NT_BSC_BUDGETVTOLL" (
"F_ID" VARCHAR2(100 BYTE) NOT NULL ,
"F_YEAR" VARCHAR2(4 BYTE) NULL ,
"F_TOLLID" VARCHAR2(50 BYTE) NULL ,
"F_BUDGETID" VARCHAR2(50 BYTE) NULL ,
"F_CBUDGETID" VARCHAR2(50 BYTE) NULL ,
"F_VERSION" VARCHAR2(1 BYTE) DEFAULT '1' NULL ,
"F_AUDITMSG" VARCHAR2(100 BYTE) NULL ,
"F_TRIALSTATUS" VARCHAR2(1 BYTE) DEFAULT '0' NULL ,
"F_FIRAUDITER" VARCHAR2(50 BYTE) NULL ,
"F_FIRAUDITTIME" VARCHAR2(20 BYTE) NULL ,
"F_FINAUDITER" VARCHAR2(50 BYTE) NULL ,
"F_FINAUDITTIME" VARCHAR2(64 BYTE) NULL ,
"F_EDITTIME" VARCHAR2(64 BYTE) NULL ,
"F_STARTDATE" VARCHAR2(8 BYTE) NULL ,
"F_ENDDATE" VARCHAR2(8 BYTE) NULL ,
)
领域模型类
public class BudgetVtoll {
private String id;
private String year;
private String tollid;
private String budgetid;
private String cbudgetid;
private String version;
/**
* 使用JSR-303注解来校验数据
*/
@Size(max = 100)
private String auditmsg;
private String trialstatus;
private String firauditer;
private String firaudittime;
private String finauditer;
private String finaudittime;
private String edittime;
private String startdate;
private String enddate;
// 下面省略get/set方法
}
数据处理及校验
定义处理器
public class CsvItemProcessor extends ValidatingItemProcessor<BudgetVtoll> {
@Override
public BudgetVtoll process(BudgetVtoll item) throws ValidationException {
/*
* 需要执行super.process(item)才会调用自定义校验器
*/
super.process(item);
/*
* 对数据进行简单的处理和转换 todo
*/
return item;
}
}
校验器定义:
public class CsvBeanValidator<T> implements Validator<T>, InitializingBean {
private javax.validation.Validator validator;
@Override
public void validate(T value) throws ValidationException {
/*
* 使用Validator的validate方法校验数据
*/
Set<ConstraintViolation<T>> constraintViolations = validator.validate(value);
if (constraintViolations.size() > 0) {
StringBuilder message = new StringBuilder();
for (ConstraintViolation<T> constraintViolation : constraintViolations) {
message.append(constraintViolation.getMessage()).append("\n");
}
throw new ValidationException(message.toString());
}
}
/**
* 使用JSR-303的Validator来校验我们的数据,在此进行JSR-303的Validator的初始化
*/
@Override
public void afterPropertiesSet() {
ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory();
validator = validatorFactory.usingContext().getValidator();
}
}
任务监听器
public class CsvJobListener implements JobExecutionListener {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private long startTime;
private long endTime;
@Override
public void beforeJob(JobExecution jobExecution) {
startTime = System.currentTimeMillis();
logger.info("任务处理开始");
}
@Override
public void afterJob(JobExecution jobExecution) {
endTime = System.currentTimeMillis();
logger.info("任务处理结束,总耗时=" + (endTime - startTime) + "ms");
}
}
配置类
@Configuration
@EnableBatchProcessing
public class CsvBatchConfig {
/**
* ItemReader定义,用来读取数据
* 1,使用FlatFileItemReader读取文件
* 2,使用FlatFileItemReader的setResource方法设置csv文件的路径
* 3,对此对cvs文件的数据和领域模型类做对应映射
*
* @return FlatFileItemReader
*/
@Bean
@StepScope
public FlatFileItemReader<BudgetVtoll> reader(@Value("#{jobParameters['input.file.name']}") String pathToFile) {
FlatFileItemReader<BudgetVtoll> reader = new FlatFileItemReader<>();
// reader.setResource(new ClassPathResource(pathToFile));
reader.setResource(new FileSystemResource(pathToFile));
reader.setLineMapper(new DefaultLineMapper<BudgetVtoll>() {
{
setLineTokenizer(new DelimitedLineTokenizer(",") {
{
setNames(new String[]{
"id","year","tollid","budgetid", "cbudgetid", "version", "auditmsg", "trialstatus",
"firauditer", "firaudittime", "finauditer", "finaudittime", "edittime", "startdate", "enddate"
});
}
});
setFieldSetMapper(new BeanWrapperFieldSetMapper<BudgetVtoll>() {
{
setTargetType(BudgetVtoll.class);
}
});
}
});
return reader;
}
/**
* ItemProcessor定义,用来处理数据
*
* @return
*/
@Bean
public ItemProcessor<BudgetVtoll, BudgetVtoll> processor() {
//使用我们自定义的ItemProcessor的实现CsvItemProcessor
CsvItemProcessor processor = new CsvItemProcessor();
//为processor指定校验器为CsvBeanValidator()
processor.setValidator(csvBeanValidator());
return processor;
}
/**
* ItemWriter定义,用来输出数据
* spring能让容器中已有的Bean以参数的形式注入,Spring Boot已经为我们定义了dataSource
*
* @param dataSource
* @return
*/
@Bean
public ItemWriter<BudgetVtoll> writer(DruidDataSource dataSource) {
JdbcBatchItemWriter<BudgetVtoll> writer = new JdbcBatchItemWriter<>();
//我们使用JDBC批处理的JdbcBatchItemWriter来写数据到数据库
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
String sql = "insert into BudgetVtoll " + " (f_id,f_year,f_tollid,f_budgetid,f_cbudgetid,f_version,f_auditmsg,f_trialstatus,f_firauditer,f_firaudittime,f_finauditer,f_finaudittime,f_edittime,f_startdate,f_enddate) "
+ " values(:id,:year,:tollid,:budgetid,:cbudgetid,:version,:auditmsg,:trialstatus,:firauditer,:firaudittime,:finauditer,:finaudittime,:edittime,:startdate,:enddate)";
//在此设置要执行批处理的SQL语句
writer.setSql(sql);
writer.setDataSource(dataSource);
return writer;
}
/**
* JobRepository,用来注册Job的容器
* jobRepositor的定义需要dataSource和transactionManager,Spring Boot已为我们自动配置了
* 这两个类,Spring可通过方法注入已有的Bean
*
* @param dataSource
* @param transactionManager
* @return
* @throws Exception
*/
@Bean
public JobRepository jobRepository(DruidDataSource dataSource, PlatformTransactionManager transactionManager) throws Exception {
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
jobRepositoryFactoryBean.setDataSource(dataSource);
jobRepositoryFactoryBean.setTransactionManager(transactionManager);
jobRepositoryFactoryBean.setDatabaseType(String.valueOf(DatabaseType.ORACLE));
// 下面事务隔离级别的配置是针对Oracle的
jobRepositoryFactoryBean.setIsolationLevelForCreate("ISOLATION_READ_COMMITTED");
jobRepositoryFactoryBean.afterPropertiesSet();
return jobRepositoryFactoryBean.getObject();
}
/**
* JobLauncher定义,用来启动Job的接口
*
* @param dataSource
* @param transactionManager
* @return
* @throws Exception
*/
@Bean
public SimpleJobLauncher jobLauncher(DruidDataSource dataSource, PlatformTransactionManager transactionManager) throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager));
return jobLauncher;
}
/**
* Job定义,我们要实际执行的任务,包含一个或多个Step
*
* @param jobBuilderFactory
* @param s1
* @return
*/
@Bean
public Job importJob(JobBuilderFactory jobBuilderFactory, Step s1) {
return jobBuilderFactory.get("importJob")
.incrementer(new RunIdIncrementer())
.flow(s1)//为Job指定Step
.end()
.listener(csvJobListener())//绑定监听器csvJobListener
.build();
}
/**
* step步骤,包含ItemReader,ItemProcessor和ItemWriter
*
* @param stepBuilderFactory
* @param reader
* @param writer
* @param processor
* @return
*/
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<BudgetVtoll> reader, ItemWriter<BudgetVtoll> writer,
ItemProcessor<BudgetVtoll, BudgetVtoll> processor) {
return stepBuilderFactory
.get("step1")
.<BudgetVtoll, BudgetVtoll>chunk(65000)//批处理每次提交65000条数据
.reader(reader)//给step绑定reader
.processor(processor)//给step绑定processor
.writer(writer)//给step绑定writer
.build();
}
@Bean
public CsvJobListener csvJobListener() {
return new CsvJobListener();
}
@Bean
public Validator<BudgetVtoll> csvBeanValidator() {
return new MyBeanValidator<>();
}
}
注意,在定义Job的时候,我通过@StepScope
注解,可以通过传递参数的方式将csv文件路径传进去。
测试类
最后再让我们写个测试类看看能不能成功:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class BatchServiceTest {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job importJob;
@Test
public void testBatch1() throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.addString("input.file.name", "E:\\NT_BSC_BUDGETVTOLL.csv")
.toJobParameters();
jobLauncher.run(importJob, jobParameters);
logger.info("testBatch1执行完成");
}
}
把csv文件放入resources/目录下面,然后执行测试。看看输出结果:
: Job: [FlowJob: [name=importJob]] launched with the following parameters: [{time=1517654976174, input.file.name=E:\NT_BSC_BUDGETVTOLL.csv}]
: 任务处理开始
: Executing step: [step1]
: 任务处理结束,总耗时=810ms
: Job: [FlowJob: [name=importJob]] completed with the following parameters: [{time=1517654976174, input.file.name=E:\NT_BSC_BUDGETVTOLL.csv}] and the following status: [COMPLETED]
: testBatch1执行完成
再去查看数据库里面的数据,已经正常写入了。
并发执行多个Job
SpringBatch批处理框架默认使用单线程完成所有任务的执行,官方推荐配置任务执行器来并发执行, 提高批处理的效率。
Spring Core 为我们提供了多种执行器实现(包括多种异步执行器),我们可以根据实际情况灵活选择使用。
配置线程池执行Job:
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5);
taskExecutor.setMaxPoolSize(10);
taskExecutor.setQueueCapacity(200);
return taskExecutor;
}
@Bean
public SimpleJobLauncher jobLauncher(ThreadPoolTaskExecutor taskExecutor, DruidDataSource dataSource,
PlatformTransactionManager transactionManager) throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setTaskExecutor(taskExecutor);
jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager));
return jobLauncher;
}
重点是上面的那句jobLauncher.setTaskExecutor(taskExecutor);
这里的并发表示的是执行不同的Job使用线程池,每个Job实例会分配一个线程去执行。这个是最推荐的做法。
如果你还想对单个Job执行逻辑采用多线程,可以再Step配置中加入线程池支持,不过需要保证你所有的Step都是线程安全的:
return stepBuilderFactory
.get("logStep1")
//设置每个Job通过并发方式执行,一般来讲一个Job就让它串行完成的好
.taskExecutor(new SimpleAsyncTaskExecutor())
//并发任务数为 10,默认为4
.throttleLimit(10)
.build();
这里我通过一个实际例子展示如何编写多个Job并发执行。
还是跟上面例子一样,将csv文件导入到表中,但是这个时候有两个csv文件,我要导入到两张表。
CREATE TABLE Z_TEST_APP (
appid INT,
zname VARCHAR2 (20),
flag VARCHAR2 (2),
CONSTRAINT app_pk PRIMARY KEY (appid)
);
CREATE TABLE Z_TEST_LOG (
logid INT,
msg VARCHAR2 (20),
logtime VARCHAR2 (8),
CONSTRAINT log_pk PRIMARY KEY (logid)
);
每个类型任务单独创建一个package,然后里面放两个类。以APP为例:
创建一个App类:
public class App {
private int appid;
private String zname;
private String flag;
}
然后创建一个AppConfig类配置任务:
@Configuration
public class AppConfig {
/**
* ItemReader定义,用来读取数据
* 1,使用FlatFileItemReader读取文件
* 2,使用FlatFileItemReader的setResource方法设置csv文件的路径
* 3,对此对cvs文件的数据和领域模型类做对应映射
*
* @return FlatFileItemReader
*/
@Bean(name = "appReader")
@StepScope
public FlatFileItemReader<App> reader(@Value("#{jobParameters['input.file.name']}") String pathToFile) {
FlatFileItemReader<App> reader = new FlatFileItemReader<>();
reader.setResource(new FileSystemResource(pathToFile));
reader.setLineMapper(new DefaultLineMapper<App>() {
{
setLineTokenizer(new DelimitedLineTokenizer("|") {
{
setNames(new String[]{
"appid", "zname", "flag"
});
}
});
setFieldSetMapper(new BeanWrapperFieldSetMapper<App>() {
{
setTargetType(App.class);
}
});
}
});
return reader;
}
// ....后面省略
}
注意,每个Bean配置都加一个name属性,然后自动注入里面需要通过@Qualifier注解来指定当前类中的Bean, 因为如果不指定name,Spring默认只会初始化一个Bean实例。
比如定义Job:
@Bean(name = "zappJob")
public Job zappJob(JobBuilderFactory jobBuilderFactory, @Qualifier("zappStep1") Step s1) {
return jobBuilderFactory.get("zappJob")
.incrementer(new RunIdIncrementer())
.flow(s1)//为Job指定Step
.end()
.listener(new MyJobListener("App"))//绑定监听器csvJobListener
.build();
}
另外一个LOG任务的配置也是同样。
好了,定义完成之后开始写测试方法:
@Test
public void testTwoJobs() throws Exception {
JobParameters jobParameters1 = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.addString("input.file.name", p.getCsvApp())
.toJobParameters();
jobLauncher.run(zappJob, jobParameters1);
JobParameters jobParameters2 = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.addString("input.file.name", p.getCsvLog())
.toJobParameters();
jobLauncher.run(zlogJob, jobParameters2);
logger.info("main线程完成");
while (true) {
Thread.sleep(2000000L);
}
}
这个是运行日志:
[ taskExecutor-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=zappJob]] launched with t
[ taskExecutor-1] com.xncoding.trans.modules.MyJobListener : 任务-App处理开始
[ main] com.xncoding.service.BatchServiceTest : main线程完成
[ taskExecutor-2] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=zlogJob]] launched with t
[ taskExecutor-2] com.xncoding.trans.modules.MyJobListener : 任务-Log处理开始
[ taskExecutor-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [zappStep1]
[ taskExecutor-2] o.s.batch.core.job.SimpleStepHandler : Executing step: [logStep1]
[ taskExecutor-2] com.xncoding.trans.modules.MyJobListener : 任务-Log处理结束,总耗时=495ms
[ taskExecutor-1] com.xncoding.trans.modules.MyJobListener : 任务-App处理结束,总耗时=585ms
[ taskExecutor-2] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=zlogJob]] completed with
[ taskExecutor-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=zappJob]] completed with
异常处理
默认情况下,Spring Batch遇到异常的时候会终止处理,比如遇到csv文件中解析错误就会终止异常。如果我想忽略掉这些异常继续处理, 可以配置在Reader中忽略异常。
这个在Step的定义中配置:
return stepBuilderFactory
.get("logStep1")
.<Log, Log>chunk(5000)//批处理每次提交5000条数据
.reader(reader)//给step绑定reader
.processor(processor)//给step绑定processor
.writer(writer)//给step绑定writer
.faultTolerant()
.retry(Exception.class) // 重试
.noRetry(ParseException.class)
.retryLimit(1) //每条记录重试一次
.skip(Exception.class)
.skipLimit(200) //一共允许跳过200次异常
// .taskExecutor(new SimpleAsyncTaskExecutor()) //设置每个Job通过并发方式执行,一般来讲一个Job就让它串行完成的好
// .throttleLimit(10) //并发任务数为 10,默认为4
.build();
上面定义了使用可容忍异常模式,遇到Exception异常就重试1次,对于ParseException异常不重试,所有异常都会忽略掉,不会导致程序终止。 但是最大允许跳过200次异常,超过这个数字就终止执行了。
然后改一下csv文件,把某个字段改大点,超过数据库中定义长度:
1|等哈哈哈|01
2|等哈哈哈|02
3|等哈哈哈|025555
4|等哈哈哈|02
5|等哈哈哈|02
重新执行发现正常执行完成,数据库中只插入了4条数据,id为3的没有。
通用配置
更进一步,如果有多个CSV文件需要导入,那么安装上面的写法。每次都要定义一个新的Config类,一个新的Bean类,代码重复率很高。
实际上可以定义一个通用配置,去掉里面的显示Bean类,通过Java反射机制,还有@StepScope注解, 实现每次运行时候根据JobParameters初始化不同的Job。
如果有一个新的CSV文件需要导入,只需要新建一个Bean,定义好相应的列,然后将Bean的属性列表、插入SQL语句作为参数传入即可。
测试代码:
/**
* 测试一个配置类,可同时运行多个任务
* @throws Exception 异常
*/
@Test
public void testCommonJobs() throws Exception {
JobParameters jobParameters1 = new JobParametersBuilder()
.addLong("time",System.currentTimeMillis())
.addString(KEY_JOB_NAME, "App")
.addString(KEY_FILE_NAME, p.getCsvApp())
.addString(KEY_VO_NAME, "com.xncoding.trans.modules.zapp.App")
.addString(KEY_COLUMNS, String.join(",", new String[]{
"appid", "zname", "flag"
}))
.addString(KEY_SQL, "insert into z_test_App (appid, zname, flag) values(:appid, :zname, :flag)")
.toJobParameters();
jobLauncher.run(commonJob, jobParameters1);
JobParameters jobParameters2 = new JobParametersBuilder()
.addLong("time",System.currentTimeMillis())
.addString(KEY_JOB_NAME, "Log")
.addString(KEY_FILE_NAME, p.getCsvLog())
.addString(KEY_VO_NAME, "com.xncoding.trans.modules.zlog.Log")
.addString(KEY_COLUMNS, String.join(",", new String[]{
"logid", "msg", "logtime"
}))
.addString(KEY_SQL, "insert into z_test_Log (logid, msg, logtime) values(:logid, :msg, :logtime)")
.toJobParameters();
jobLauncher.run(commonJob, jobParameters2);
logger.info("Main线程执行完成");
while (true) {
Thread.sleep(2000000L);
}
}
FAQ
在进行Oracle操作的时候报一个错误:
ORA-08177: 无法连续访问此事务处理
原因:Spring Batch 默认是 ISOLATION_SERIALIZABLE
官方文档说明:
The default is ISOLATION_SERIALIZABLE, which prevents accidental concurrent execution of the same job (ISOLATION_REPEATABLE_READ would work as well).
而Oracle默认的事务隔离级别是ISOLATION_READ_COMMITTED
解决方案就是修改JobRepositoryFactoryBean
的定义,加一个配置:
jobRepositoryFactoryBean.setIsolationLevelForCreate("ISOLATION_READ_COMMITTED");
- 感谢你赐予我前进的力量