2025年SpringBatch文档

SpringBatch文档第一章 SpringBatch 入门 第一节 SpringBatch 概述 Spring Batch 是一个轻量级的 完善的批处理框架 旨在帮助企业建立健壮 高效的批处理应用 Spring Batch 提供了大量可重用的组件 包括了日志 追踪 事务 任务作业统计 任务重启 跳过 重复 资源管理 对于大数据量和高性能的批处理任务的分区功能 远程功能 Spring Batch 是一个批处理应用框架

第一章 SpringBatch 入门

第一节 SpringBatch概述

Spring Batch 是一个轻量级的、完善的批处理框架,旨在帮助企业建立健壮、高效的批处理应用。

Spring Batch 提供了大量可重用的组件,包括了日志、追踪、事务、任务作业统计、任务重启、跳过、重复、资源管理。对于大数据量和高性能的批处理任务的分区功能、远程功能

Spring Batch 是一个批处理应用框架,不是调度框架,但需要和调度框架合作来构建完成的批处理任务。它只关注批处理任务相关的问题,如事务、并发、监控、执行等,并不提供相应的调度功能。一般和调度框架例如quatrz结合使用

框架主要有以下功能:

Transaction management(事务管理)
Chunk based processing(基于块的处理)
Declarative I/O(声明式的输入输出)
Start/Stop/Restart(启动/停止/再启动)
Retry/Skip(重试/跳过)

框架一共有4个主要角色:JobLauncher是任务启动器,通过它来启动任务,可以看做是程序的入口。Job代表着一个具体的任务。Step代表着一个具体的步骤,一个Job可以包含多个Step(想象把大象放进冰箱这个任务需要多少个步骤你就明白了)。JobRepository是存储数据的地方,可以看做是一个数据库的接口,在任务执行的时候需要通过它来记录任务状态等等信息。

第二节 搭建SpringBatch项目

https://start.spring.io/

 
org.springframework.boot
spring-boot-starter-batch


org.springframework.batch
spring-batch-test
test

/code>

code class='prism'>


mysql
mysql-connector-java

/dependency>

br />

dependency>


org.springframework.boot

spring-boot-starter-jdbc

/artifactId>

br />

br /> .start(jobFlowStep1())

br /> .next(jobFlowStep2())

br /> .build();

br /> }

br />

br /> // 创建Job

br /> @Bean

br /> public Job jobFlowDemo2Job() {

br />

br /> return jobBuilderFactory.get("jobFlowDemo2Job")

br /> .start(jobFlowDemo2Flow())

br /> //TODO 继续使用step

br /> .next(jobFlowStep3())

br /> .end()

br /> .build();

br /> }

br />

br /> }

br />

br /> import org.springframework.batch.core.Job;

br /> import org.springframework.batch.core.Step;

br /> import org.springframework.batch.core.StepContribution;

br /> import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;

br /> import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;

br /> import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;

br /> import org.springframework.batch.core.job.builder.FlowBuilder;

br /> import org.springframework.batch.core.job.flow.Flow;

br /> import org.springframework.batch.core.scope.context.ChunkContext;

br /> import org.springframework.batch.core.step.tasklet.Tasklet;

br /> import org.springframework.batch.repeat.RepeatStatus;

br /> import org.springframework.beans.factory.annotation.Autowired;

br /> import org.springframework.context.annotation.Bean;

br /> import org.springframework.context.annotation.Configuration;

br /> import org.springframework.core.task.SimpleAsyncTaskExecutor;

br />

br /> /** * @author hongtao.hao * @date 2019/7/3 */

br /> @Configuration

br /> @EnableBatchProcessing

br /> public class JobSplitDemo3 {

br />

br />

br /> @Autowired

br /> private JobBuilderFactory jobBuilderFactory;

br />

br /> @Autowired

br /> private StepBuilderFactory stepBuilderFactory;

br />

br /> @Bean

br /> public Step jobSplitStep1() {

br />

br /> return stepBuilderFactory.get("jobSplitStep1").tasklet(new Tasklet() {

br />

br /> @Override

br /> public RepeatStatus execute(StepContribution arg0, ChunkContext chunkContext) throws Exception {

br />

br /> System.out

br /> .println(chunkContext.getStepContext().getStepName() + "," + Thread.currentThread().getName());

br /> return RepeatStatus.FINISHED;

br /> }

br /> }).build();

br /> }

br />

br /> @Bean

br /> public Step jobSplitStep2() {

br />

br /> return stepBuilderFactory.get("jobSplitStep2").tasklet(new Tasklet() {

br />

br /> @Override

br /> public RepeatStatus execute(StepContribution arg0, ChunkContext chunkContext) throws Exception {

br />

br /> System.out

br /> .println(chunkContext.getStepContext().getStepName() + "," + Thread.currentThread().getName());

br /> return RepeatStatus.FINISHED;

br /> }

br /> }).build();

br /> }

br />

br /> @Bean

br /> public Step jobSplitStep3() {

br />

br /> return stepBuilderFactory.get("jobSplitStep3").tasklet(new Tasklet() {

br />

br /> @Override

br /> public RepeatStatus execute(StepContribution arg0, ChunkContext chunkContext) throws Exception {

br />

br /> System.out

br /> .println(chunkContext.getStepContext().getStepName() + "," + Thread.currentThread().getName());

br /> return RepeatStatus.FINISHED;

br /> }

br /> }).build();

br /> }

br />

br /> @Bean

br /> public Flow jobSplitFLow1() {

br />

br /> return new FlowBuilder

Flow>("jobSplitFLow1").start(jobSplitStep1()).build();


}



@Bean


public Flow jobSplitFLow2() {



return new FlowBuilder

("jobSplitFLow2")
.start(jobSplitStep2())
.next(jobSplitStep3())
.build();
}

@Bean
Job jobSplitJob() {

return jobBuilderFactory.get("jobSplitJob")
.start(jobSplitFLow1())
//TODO 利用split完成 jobSplitFLow1 和 jobSplitFLow2 并发(同时)执行
.split(new SimpleAsyncTaskExecutor())
.add(jobSplitFLow2())
.end()
.build();// 让两个flow分别在各自的线程中异步执行
}
}

/pre>

h3>第四节 决策器的使用

/h3>

p>接口:JobExecutionDecider

/p>

pre class='language-javascript'>

package com.example.demo.batch;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;

/** * @author hongtao.hao * @date 2019/7/3 */
public class MyDecider implements JobExecutionDecider {


private int count;

@Override
public FlowExecutionStatus decide(JobExecution arg0, StepExecution arg1) {

count++;
if (count % 2 == 0)
return new FlowExecutionStatus("even");
else
return new FlowExecutionStatus("odd");
}

}

/pre>

pre class='language-javascript'>

package com.example.demo.batch;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/** * @author hongtao.hao * @date 2019/7/3 */
@Configuration
@EnableBatchProcessing
public class DecisionDemo4 {


@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Bean
public Step step1() {

return stepBuilderFactory.get("step1").tasklet(new Tasklet() {

@Override
public RepeatStatus execute(StepContribution arg0, ChunkContext arg1) throws Exception {

System.out.println("step1");
return RepeatStatus.FINISHED;
}
}).build();
}

@Bean
public Step step2() {

return stepBuilderFactory.get("step2").tasklet(new Tasklet() {

@Override
public RepeatStatus execute(StepContribution arg0, ChunkContext arg1) throws Exception {

System.out.println("step2");
return RepeatStatus.FINISHED;
}
}).build();
}

@Bean
public Step step3() {

return stepBuilderFactory.get("step3").tasklet(new Tasklet() {

@Override
public RepeatStatus execute(StepContribution arg0, ChunkContext arg1) throws Exception {

System.out.println("step3");
return RepeatStatus.FINISHED;
}
}).build();
}

//TODO 创建决策器
@Bean
public JobExecutionDecider myDecider() {

return new MyDecider();
}

// 创建Job
@Bean
public Job DecisionDemoJob() {

return jobBuilderFactory.get("DecisionDemoJob")
.start(step1())
//next一个决策器
.next(myDecider())
//TODO 通过返回值决定走哪一个step
.from(myDecider()).on("even").to(step3())
.from(myDecider()).on("odd").to(step2())
//TODO 否则接着走决策器
.from(step2()).on("*").to(myDecider())
.end().build();
}
}

/pre>

h3>第五节 Job的嵌套

/h3>

p>一个Job可以嵌套在另一个Job中,被嵌套的Job称为子Job,外部Job称为父Job。


子Job不能单独执行,需要由父Job来启动

/p>

p>案例:创建两个Job,作为子Job,再创建一个Job作为父Job

/p>

p>@Configuration


@EnableBatchProcessing


class NestedJobDemo5 {


/p>

pre class='language-javascript'>

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Autowired
private Job childJobOne;

@Autowired
private Job childJobTwo;

@Autowired
private JobLauncher launcher;

@Bean
public Job parentJob(JobRepository repository,PlatformTransactionManager transactionManager)
{
return jobBuilderFactory.get("parentJob")
.start(childJob1(repository,transactionManager))
.next(childJob2(repository,transactionManager))
.build();
}

//返回的是Job类型的Step
private Step childJob2(JobRepository repository,PlatformTransactionManager transactionManager) {
return new JobStepBuilder(new StepBuilder("childJobTwo"))
.job(childJobTwo).launcher(launcher)
.repository(repository)
.transactionManager(transactionManager)
.build();
}

private Step childJob1(JobRepository repository,PlatformTransactionManager transactionManager) {
return new JobStepBuilder(new StepBuilder("childJobOne"))
.job(childJobOne).launcher(launcher)
.repository(repository)
.transactionManager(transactionManager)
.build();
}

/pre>

p>}

/p>

p>spring.batch.job.names=parentJob

/p>

h3>第六节监听器的使用

/h3>

p>用来监听批处理作业的执行情况

/p>

p>创建监听可以通过实现接口或使用注解

/p>

p>JobExecutionListener(before,after)

/p>

p>StepExecutionListener(before,after)

/p>

p>ChunkListener(before,after,error)

/p>

p>ItemReadListener,ItemProcessListener,ItemWriteListener(before,after,error)

/p>

p>public class MyJobListener implements JobExecutionListener{


/p>

pre class='language-javascript'>

@Override
public void beforeJob(JobExecution jobExecution) {
// TODO Auto-generated method stub
System.out.println(jobExecution.getJobInstance().getJobName()+"before...");
}

@Override
public void afterJob(JobExecution jobExecution) {
// TODO Auto-generated method stub
System.out.println(jobExecution.getJobInstance().getJobName()+"after...");
}

/pre>

p>}

/p>

p>public class MyChunkListener {


/p>

pre class='language-javascript'>

@BeforeChunk
public void beforeChunk(ChunkContext context){
System.out.println(context.getStepContext().getStepName()+"before...");
}

@AfterChunk
public void afterChunk(ChunkContext context){
System.out.println(context.getStepContext().getStepName()+"after...");
}

/pre>

p>}

/p>

h3>第七节Job参数

/h3>

p>@Configuration


@EnableBatchProcessing


public class JobParametersDemo7 implements StepExecutionListener{


/p>

pre class='language-javascript'>

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;


private Map parameters;


@Bean
public Job parametersJob()
{
return jobBuilderFactory.get("parametersJob")
.start(parameterStep())
.build();
}

//在step执行之前接收到参数,因为会在step中用到,所以使用监听
@Bean
public Step parameterStep() {
return stepBuilderFactory.get("parameterStep")
.listener(this)
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution arg0, ChunkContext arg1) throws Exception {
System.out.println(parameters.get("name"));
return RepeatStatus.FINISHED;
}
}).build();
}


@Override
public void beforeStep(StepExecution stepExecution) {
parameters = stepExecution.getJobParameters().getParameters();
}

@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return null;
}

}

第三章 数据输入

第一节 ItemReader概述

@Configuration

@EnableBatchProcessing

public class ItemReaderDemo8 {

@Autowired
private JobBuilderFactory jobBuilderFactroy;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Bean
public Job itemReaderJob()
{
return jobBuilderFactroy.get("itemReaderJob")
.start(itemReaderStep())
.build();
}

@Bean
public Step itemReaderStep() {
return stepBuilderFactory.get("itemReaderStep")
.chunk(2)
.reader(itemReaderDemoRead())
.writer(list->{
for(String item:list)
{
System.out.println(item+"...");
}
}).build();

}

@Bean
public MyReader itemReaderDemoRead() {
List data=Arrays.asList("cat","dog","pig","duck");
return new MyReader(data);
}

}

第二节 从数据库中读取数据

JdbcPagingItemReader

@Configuration

@EnableBatchProcessing

class DbJdbcDemo9 {

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier("dbJdbcWriter")
private ItemWriter dbJdbcWriter;

@Autowired
private DataSource dataSource;

@Bean
public Job dbJdbcJob()
{
return jobBuilderFactory.get("dbJdbcJob")
.start(dbJdbcStep())
.build();
}

@Bean
public Step dbJdbcStep() {
return stepBuilderFactory.get("dbJdbcStep")
.chunk(2)
.reader(dbJdbcReader())
.writer(dbJdbcWriter)
.build();
}

@Bean
@StepScope
public JdbcPagingItemReader dbJdbcReader() {

JdbcPagingItemReader reader=new JdbcPagingItemReader<>();
//使用JdbcPagingItemReader对象从数据库中读取数据
reader.setDataSource(dataSource);
reader.setFetchSize(2);
reader.setRowMapper(new RowMapper() {
@Override
public User mapRow(ResultSet rs, int rowNum) throws SQLException {
User user=new User();
user.setId(rs.getInt(1));
user.setUsername(rs.getString(2));
user.setPassword(rs.getString(3));
user.setAge(rs.getInt(4));
return user;
}
});
//指定sql语句
MySqlPagingQueryProvider provider =new MySqlPagingQueryProvider();
provider.setSelectClause("id,username,password,age");
provider.setFromClause("from User");

Map sort=new HashMap<>(1);
sort.put("id", Order.ASCENDING);
provider.setSortKeys(sort);

reader.setQueryProvider(provider);
return reader;
}

}

第三节 从普通文件中读取数据

FlatFileItemReader

@Configuration

@EnableBatchProcessing

class FileDemo10 {

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier("fileWriter")
private ItemWriter fileWriter;


@Bean
public Job fileJob()
{
return jobBuilderFactory.get("fileJob")
.start(fileStep())
.build();
}

@Bean
public Step fileStep() {
return stepBuilderFactory.get("fileStep")
.chunk(100)
.reader(fileReader())
.writer(fileWriter)
.build();
}

@Bean
@StepScope
public FlatFileItemReader fileReader() {
FlatFileItemReader reader = new FlatFileItemReader<>();
reader.setResource(new ClassPathResource("customer.csv"));
reader.setLinesToSkip(1);//跳过第一行

//如何解析数据
DelimitedLineTokenizer tokenizer=new DelimitedLineTokenizer();
//制定四个表头字段
tokenizer.setNames(new String[]{"id","firstName","lastName","birthday"});
//把一行映射为Customer对象
DefaultLineMapper mapper=new DefaultLineMapper<>();
mapper.setLineTokenizer(tokenizer);
mapper.setFieldSetMapper(new FieldSetMapper() {
@Override
public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
Customer customer =new Customer();
customer.setId(fieldSet.readLong("id"));
customer.setFirstName(fieldSet.readString("firstName"));
customer.setLastName(fieldSet.readString("lastName"));
customer.setBirthday(fieldSet.readString("birthday"));
return customer;
}
});

mapper.afterPropertiesSet();
reader.setLineMapper(mapper);

return reader;
}

第四节 从XML文件中读取数据

StaxEventItemReader

org.springframework spring-oxm com.thoughtworks.xstream xstream 1.4.7

@Configuration

@EnableBatchProcessing

public class XmlDemo11 {

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier("xmlWriter")
private ItemWriter xmlWriter;



@Bean
public Job xmlJob()
{
return jobBuilderFactory.get("xmlJob")
.start(xmlStep())
.build();
}

@Bean
public Step xmlStep() {
return stepBuilderFactory.get("xmlStep")
.chunk(20)
.reader(xmlReader())
.writer(xmlWriter)
.build();
}


@Bean
@StepScope
public StaxEventItemReader xmlReader() {
StaxEventItemReader reader = new StaxEventItemReader<>();
reader.setResource(new ClassPathResource("customer.xml"));
//指定需要处理的根标签
reader.setFragmentRootElementName("customer");
//加入oxm的依赖,把xml转成对象
XStreamMarshaller unmarshaller=new XStreamMarshaller();
Map map = new HashMap<>();
map.put("customer", Customer.class);
unmarshaller.setAliases(map);
reader.setUnmarshaller(unmarshaller);
return reader;
}

第五节 从多个文件中读取数据

MultiResourceItemReader

@Configuration

@EnableBatchProcessing

class MultiFileDemo12 {

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier("multiFileWriter")
private ItemWriter multiFileWriter;

@Value("classpath:/file*.csv")
private Resource[] fileResources;


@Bean
public Job multiFileJob()
{
return jobBuilderFactory.get("multiFileJob")
.start(multiFileStep())
.build();
}

@Bean
public Step multiFileStep() {
return stepBuilderFactory.get("multiFileStep")
.chunk(100)
.reader(multiFileReader())
.writer(multiFileWriter)
.build();
}

@Bean
@StepScope
public MultiResourceItemReader multiFileReader() {
MultiResourceItemReader reader =new MultiResourceItemReader();

reader.setDelegate(fileReader());
reader.setResources(fileResources);
return reader;
}

@Bean
@StepScope
public FlatFileItemReader fileReader() {
FlatFileItemReader reader = new FlatFileItemReader<>();
reader.setResource(new ClassPathResource("customer.csv"));
reader.setLinesToSkip(1);//跳过第一行

//如何解析数据
DelimitedLineTokenizer tokenizer=new DelimitedLineTokenizer();
//制定四个表头字段
tokenizer.setNames(new String[]{"id","firstName","lastName","birthday"});
//把一行映射为Customer对象
DefaultLineMapper mapper=new DefaultLineMapper<>();
mapper.setLineTokenizer(tokenizer);
mapper.setFieldSetMapper(new FieldSetMapper() {
@Override
public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
Customer customer =new Customer();
customer.setId(fieldSet.readLong("id"));
customer.setFirstName(fieldSet.readString("firstName"));
customer.setLastName(fieldSet.readString("lastName"));
customer.setBirthday(fieldSet.readString("birthday"));
return customer;
}
});

mapper.afterPropertiesSet();
reader.setLineMapper(mapper);

return reader;
}

第六节 ItemReader异常处理及重启

@Component(“restartReader”)

public class RestartReader implements ItemStreamReader{

private FlatFileItemReader customerFlatFileItemReader=new FlatFileItemReader<>();
private Long curLine = 0L;
private boolean restart = false;
private ExecutionContext executionContext;

public RestartReader()
{
customerFlatFileItemReader.setResource(new ClassPathResource("restartDemo.csv"));
customerFlatFileItemReader.setLinesToSkip(1);//跳过第一行

//如何解析数据
DelimitedLineTokenizer tokenizer=new DelimitedLineTokenizer();
//制定四个表头字段
tokenizer.setNames(new String[]{"id","firstName","lastName","birthday"});
//把一行映射为Customer对象
DefaultLineMapper mapper=new DefaultLineMapper<>();
mapper.setLineTokenizer(tokenizer);
mapper.setFieldSetMapper(new FieldSetMapper() {
@Override
public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
Customer customer =new Customer();
customer.setId(fieldSet.readLong("id"));
customer.setFirstName(fieldSet.readString("firstName"));
customer.setLastName(fieldSet.readString("lastName"));
customer.setBirthday(fieldSet.readString("birthday"));
return customer;
}
});

mapper.afterPropertiesSet();
customerFlatFileItemReader.setLineMapper(mapper);

}


@Override
public Customer read() throws Exception, UnexpectedInputException, ParseException,
NonTransientResourceException {

Customer customer = null;

this.curLine++;

if(restart){
customerFlatFileItemReader.setLinesToSkip(this.curLine.intValue()-1);
restart = false;
System.out.println("Start reading from line: " + this.curLine);
}

customerFlatFileItemReader.open(this.executionContext);
customer = customerFlatFileItemReader.read();


if (customer!=null && customer.getFirstName().equals("WrongName")) {
throw new RuntimeException("Something wrong. Customer id: " + customer.getId());
}
return customer;
}

@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
this.executionContext = executionContext;
if (executionContext.containsKey("curLine")){
this.curLine = executionContext.getLong("curLine");
this.restart = true;
}else{
this.curLine = 0L;
executionContext.put("curLine",this.curLine);
System.out.println("Start reading from line: " + this.curLine + 1);
}

}

@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
executionContext.put("curLine",this.curLine);
System.out.println("currentLine:"+this.curLine);
}

@Override
public void close() throws ItemStreamException {

}

}

第四章 数据输出

第一节 ItemWriter概述

ItemReader是一个数据一个数据的读,而
ItemWriter是一批一批的输出

@Component(“myWrite”)

public class MyWrite implements ItemWriter{

@Override
public void write(List items) throws Exception {
System.out.println(items.size());
for(String ss:items){
System.out.println(ss);
}
}

}

@Configuration
public class ItemWriterDemo {

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier("myWrite")
private ItemWriter myWrite;

@Bean
public Job itemWriterDemoJob()
{
return jobBuilderFactory.get("itemWriterDemoJob")
.start(itemWriterDemoStep())
.build();
}

@Bean
public Step itemWriterDemoStep() {

return stepBuilderFactory.get("itemWriterDemoStep")
.chunk(5)
.reader(myRead())
.writer(myWrite)
.build();
}

@Bean
public ItemReader myRead() {
List items=new ArrayList<>();
for(int i=1;i<=50;i++){
items.add("java"+i);
}
return new ListItemReader(items);
}

}

第二节 数据输出到数据库

Neo4jItemWriter

MongoItemWriter

RepositoryItemWriter

HibernateItemWriter

JdbcBatchItemWriter

JpaItemWriter

GemfireItemWriter

@Configuration

public class ItemWriterDbConfig {

@Autowired
private DataSource dataSource;

@Bean
public JdbcBatchItemWriter itemWriterDb()
{
JdbcBatchItemWriter writer=new JdbcBatchItemWriter();
writer.setDataSource(dataSource);
writer.setSql("insert into customer(id,firstName,lastName,birthday) values "+
"(:id,:firstName,:lastName,:birthday)");
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
return writer;
}

}

第三节 数据输出到普通文件

FlatFileItemWriter

案例:从数据库中读取数据写入到文件

@Configuration
public class DbFileWriterConfig {

@Bean
public FlatFileItemWriter dbFileWriter() throws Exception
{
FlatFileItemWriter writer=new FlatFileItemWriter();
String path = "e:\\customer.txt";
writer.setResource(new FileSystemResource(path));

//把一个 Customer对象转成一行字符串
writer.setLineAggregator(new LineAggregator() {

ObjectMapper mapper = new ObjectMapper();

@Override
public String aggregate(Customer item) {
String str = null;
try {
str=mapper.writeValueAsString(item);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return str;
}
});
writer.afterPropertiesSet();
return writer;
}

}

第四节 数据输出到xml文件

StaxEvenItemWriter

@Configuration
public class XmlFileWriterConfig {

@Bean
public StaxEventItemWriter xmlFileWriter() throws Exception
{
StaxEventItemWriter writer=new StaxEventItemWriter();

XStreamMarshaller marshaller = new XStreamMarshaller();
Map aliases = new HashMap<>();
aliases.put("customer", Customer.class);
marshaller.setAliases(aliases);

writer.setRootTagName("customers");
writer.setMarshaller(marshaller);

String path="e:\\cus.xml";
writer.setResource(new FileSystemResource(path));
writer.afterPropertiesSet();

return writer;
}

}

第五节 数据输出到多个文件

CompositeItemWriter

ClassifierCompositeItemWriter

@Bean

public CompositeItemWriter multiFileItemWriter() throws Exception
{
CompositeItemWriter writer=new CompositeItemWriter();
writer.setDelegates(Arrays.asList(jsonFileWriter(),xmlFileWriter()));

writer.afterPropertiesSet();
return writer;
}


//实现分类

@Bean

public ClassifierCompositeItemWriter multiFileItemWriter() throws Exception
{
ClassifierCompositeItemWriter writer=new ClassifierCompositeItemWriter();

writer.setClassifier(new Classifier>() {
@Override
public ItemWriter classify(Customer customer) {
ItemWriter write=null;
try {
write=customer.getId()%2==0?jsonFileWriter():xmlFileWriter();
} catch (Exception e) {
e.printStackTrace();
}
return write;
}
});
return writer;
}

第六节 ItemProcessor的使用

ItemProcessor用于处理业务逻辑,验证,过滤等功能

CompositeItemProcessor

案例:从数据库中读取数据,然后对数据进行处理,最后输出到普通文件

@Bean

public CompositeItemProcessor process()
{
CompositeItemProcessor processor=new CompositeItemProcessor<>();

List> delegates=new ArrayList<>();
delegates.add(firstNameUpperProcessor);
delegates.add(idFilterProcessor);
processor.setDelegates(delegates);

return processor;
}

第五章 错误处理

第一节 错误处理概述

默认情况下当任务出现异常时,SpringBatch会
结束任务,当使相同的参数重启任务时,SpringBatch会去执行未执行的剩余任务.

@Bean

@StepScope

public Tasklet errorHandling() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
Map stepExecutionContext = chunkContext.getStepContext().getStepExecutionContext();
if(stepExecutionContext.containsKey("qianfeng")){
System.out.println("The second run will success");
return RepeatStatus.FINISHED;
}
else{
System.out.println("The first run will fail");
chunkContext.getStepContext().getStepExecutionContext().put("qianfeng", true);
throw new RuntimeException("error ...");
}
}
};
}

第二节 错误重试(Retry)

@Bean

public Step retryDemoStep() {
return stepBuilderFactory.get("retryDemoStep")
.chunk(10)
.reader(reader())
.processor(retryItemProcessor)
.writer(retryItemWriter)
.faultTolerant()
.retry(CustomRetryException.class)
.retryLimit(10)
.build();
}

第三节 错误跳过(Skip)

@Bean

public Step skipDemoStep() {
return stepBuilderFactory.get("skipDemoStep")
.chunk(10)
.reader(reader())
.processor(skipItemProcessor)
.writer(skipItemWriter)
.faultTolerant()
.skip(CustomRetryException.class)
.skipLimit(10)
.build();
}

第四节 错误跳过监听器(Skip Listener)

@Bean

public Step skipListenerDemoStep1() {
return stepBuilderFactory.get("skipListenerDemoStep1")
.chunk(10)
.reader(reader())
.processor(skipItemProcessor)
.writer(skipItemWriter)
.faultTolerant()
.skip(CustomRetryException.class)
.skipLimit(10)
.listener(mySkipListener)
.build();
}

@Component

public class MySkipListener implements SkipListener {

@Override
public void onSkipInRead(Throwable t) {

}

@Override
public void onSkipInWrite(String item, Throwable t) {

}

@Override
public void onSkipInProcess(String item, Throwable t) {
System.out.println(item + " occur exception:" + t);
}

}

第六章 作业调度

第一节 JobLauncher的使用

控制任务什么时候启动

org.springframework.boot spring-boot-starter-web

程序启动时不执行任务:

spring.batch.job.enabled=false

@RestController

@RequestMapping(“/job”)

public class JobLauncherController {

@Autowired
private JobLauncher jobLauncher;

@Autowired
private Job jobLauncherDemoJob;

@GetMapping("/{job1}")
public String job1Run(@PathVariable String job1) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException
{
System.out.println(job1);
JobParameters jobParameters=new JobParametersBuilder()
.addString("job1", job1)
.toJobParameters();
jobLauncher.run(jobLauncherDemoJob, jobParameters);

return "job1 success";
}

第二节 JobOperator的使用

@Configuration

public class JobOperatorDemo implements StepExecutionListener,ApplicationContextAware{

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;


private Map parameter;

@Autowired
private JobLauncher jobLauncher;

@Autowired
private JobRepository jobRepository;

@Autowired
private JobExplorer jobExplorer;

@Autowired
private JobRegistry jobRegistry;

private ApplicationContext context;


@Bean
public JobRegistryBeanPostProcessor jobRegistrar() throws Exception {
JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();

postProcessor.setJobRegistry(jobRegistry);
postProcessor.setBeanFactory(context.getAutowireCapableBeanFactory());
postProcessor.afterPropertiesSet();

return postProcessor;
}


@Bean
public JobOperator jobOperator(){
SimpleJobOperator operator = new SimpleJobOperator();

operator.setJobLauncher(jobLauncher);
operator.setJobParametersConverter(new DefaultJobParametersConverter());
operator.setJobRepository(jobRepository);
operator.setJobExplorer(jobExplorer);
operator.setJobRegistry(jobRegistry);

return operator;
}


@Bean
public Job jobOperatorDemoJob()
{
return jobBuilderFactory.get("jobOperatorDemoJob")
.start(jobOperatorDemoStep())
.build();
}

@Bean
public Step jobOperatorDemoStep() {
return stepBuilderFactory.get("jobOperatorDemoStep")
.listener(this)
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution arg0, ChunkContext arg1) throws Exception {
System.out.println(parameter.get("msg").getValue());
return RepeatStatus.FINISHED;
}
}).build();
}



@Override
public void beforeStep(StepExecution stepExcution) {
parameter = stepExcution.getJobParameters().getParameters();

}

@Override
public ExitStatus afterStep(StepExecution arg0) {

return null;
}


@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.context=applicationContext;
}

}

编程小号
上一篇 2025-01-19 15:33
下一篇 2025-01-19 15:27

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/hz/148219.html