第一章 SpringBatch 入门
第一节 SpringBatch概述
Spring Batch 是一个轻量级的、完善的批处理框架,旨在帮助企业建立健壮、高效的批处理应用。
Spring Batch 提供了大量可重用的组件,包括了日志、追踪、事务、任务作业统计、任务重启、跳过、重复、资源管理。对于大数据量和高性能的批处理任务的分区功能、远程功能
Spring Batch 是一个批处理应用框架,不是调度框架,但需要和调度框架合作来构建完成的批处理任务。它只关注批处理任务相关的问题,如事务、并发、监控、执行等,并不提供相应的调度功能。一般和调度框架例如quatrz结合使用
框架主要有以下功能:
Transaction management(事务管理)
Chunk based processing(基于块的处理)
Declarative I/O(声明式的输入输出)
Start/Stop/Restart(启动/停止/再启动)
Retry/Skip(重试/跳过)
框架一共有4个主要角色:JobLauncher是任务启动器,通过它来启动任务,可以看做是程序的入口。Job代表着一个具体的任务。Step代表着一个具体的步骤,一个Job可以包含多个Step(想象把大象放进冰箱这个任务需要多少个步骤你就明白了)。JobRepository是存储数据的地方,可以看做是一个数据库的接口,在任务执行的时候需要通过它来记录任务状态等等信息。
第二节 搭建SpringBatch项目
https://start.spring.io/
org.springframework.boot
spring-boot-starter-batch
org.springframework.batch
spring-batch-test
test
/code>
code class='prism'>
mysql
mysql-connector-java/dependency>
br />
dependency>
org.springframework.boot spring-boot-starter-jdbc
/artifactId>
br />
br /> .start(jobFlowStep1())
br /> .next(jobFlowStep2())
br /> .build();
br /> }
br />
br /> // 创建Job
br /> @Bean
br /> public Job jobFlowDemo2Job() {
br />
br /> return jobBuilderFactory.get("jobFlowDemo2Job")
br /> .start(jobFlowDemo2Flow())
br /> //TODO 继续使用step
br /> .next(jobFlowStep3())
br /> .end()
br /> .build();
br /> }
br />
br /> }
br />
br /> import org.springframework.batch.core.Job;
br /> import org.springframework.batch.core.Step;
br /> import org.springframework.batch.core.StepContribution;
br /> import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
br /> import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
br /> import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
br /> import org.springframework.batch.core.job.builder.FlowBuilder;
br /> import org.springframework.batch.core.job.flow.Flow;
br /> import org.springframework.batch.core.scope.context.ChunkContext;
br /> import org.springframework.batch.core.step.tasklet.Tasklet;
br /> import org.springframework.batch.repeat.RepeatStatus;
br /> import org.springframework.beans.factory.annotation.Autowired;
br /> import org.springframework.context.annotation.Bean;
br /> import org.springframework.context.annotation.Configuration;
br /> import org.springframework.core.task.SimpleAsyncTaskExecutor;
br />
br /> /** * @author hongtao.hao * @date 2019/7/3 */
br /> @Configuration
br /> @EnableBatchProcessing
br /> public class JobSplitDemo3 {
br />
br />
br /> @Autowired
br /> private JobBuilderFactory jobBuilderFactory;
br />
br /> @Autowired
br /> private StepBuilderFactory stepBuilderFactory;
br />
br /> @Bean
br /> public Step jobSplitStep1() {
br />
br /> return stepBuilderFactory.get("jobSplitStep1").tasklet(new Tasklet() {
br />
br /> @Override
br /> public RepeatStatus execute(StepContribution arg0, ChunkContext chunkContext) throws Exception {
br />
br /> System.out
br /> .println(chunkContext.getStepContext().getStepName() + "," + Thread.currentThread().getName());
br /> return RepeatStatus.FINISHED;
br /> }
br /> }).build();
br /> }
br />
br /> @Bean
br /> public Step jobSplitStep2() {
br />
br /> return stepBuilderFactory.get("jobSplitStep2").tasklet(new Tasklet() {
br />
br /> @Override
br /> public RepeatStatus execute(StepContribution arg0, ChunkContext chunkContext) throws Exception {
br />
br /> System.out
br /> .println(chunkContext.getStepContext().getStepName() + "," + Thread.currentThread().getName());
br /> return RepeatStatus.FINISHED;
br /> }
br /> }).build();
br /> }
br />
br /> @Bean
br /> public Step jobSplitStep3() {
br />
br /> return stepBuilderFactory.get("jobSplitStep3").tasklet(new Tasklet() {
br />
br /> @Override
br /> public RepeatStatus execute(StepContribution arg0, ChunkContext chunkContext) throws Exception {
br />
br /> System.out
br /> .println(chunkContext.getStepContext().getStepName() + "," + Thread.currentThread().getName());
br /> return RepeatStatus.FINISHED;
br /> }
br /> }).build();
br /> }
br />
br /> @Bean
br /> public Flow jobSplitFLow1() {
br />
br /> return new FlowBuilder
Flow>("jobSplitFLow1").start(jobSplitStep1()).build();
}
@Bean
public Flow jobSplitFLow2() {
return new FlowBuilder
("jobSplitFLow2")
.start(jobSplitStep2())
.next(jobSplitStep3())
.build();
}
@Bean
Job jobSplitJob() {
return jobBuilderFactory.get("jobSplitJob")
.start(jobSplitFLow1())
//TODO 利用split完成 jobSplitFLow1 和 jobSplitFLow2 并发(同时)执行
.split(new SimpleAsyncTaskExecutor())
.add(jobSplitFLow2())
.end()
.build();// 让两个flow分别在各自的线程中异步执行
}
}/pre>
h3>第四节 决策器的使用
/h3>
p>接口:JobExecutionDecider
/p>
pre class='language-javascript'>
package com.example.demo.batch;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
/** * @author hongtao.hao * @date 2019/7/3 */
public class MyDecider implements JobExecutionDecider {
private int count;
@Override
public FlowExecutionStatus decide(JobExecution arg0, StepExecution arg1) {
count++;
if (count % 2 == 0)
return new FlowExecutionStatus("even");
else
return new FlowExecutionStatus("odd");
}
}/pre>
pre class='language-javascript'>
package com.example.demo.batch;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/** * @author hongtao.hao * @date 2019/7/3 */
@Configuration
@EnableBatchProcessing
public class DecisionDemo4 {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Step step1() {
return stepBuilderFactory.get("step1").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution arg0, ChunkContext arg1) throws Exception {
System.out.println("step1");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution arg0, ChunkContext arg1) throws Exception {
System.out.println("step2");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step step3() {
return stepBuilderFactory.get("step3").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution arg0, ChunkContext arg1) throws Exception {
System.out.println("step3");
return RepeatStatus.FINISHED;
}
}).build();
}
//TODO 创建决策器
@Bean
public JobExecutionDecider myDecider() {
return new MyDecider();
}
// 创建Job
@Bean
public Job DecisionDemoJob() {
return jobBuilderFactory.get("DecisionDemoJob")
.start(step1())
//next一个决策器
.next(myDecider())
//TODO 通过返回值决定走哪一个step
.from(myDecider()).on("even").to(step3())
.from(myDecider()).on("odd").to(step2())
//TODO 否则接着走决策器
.from(step2()).on("*").to(myDecider())
.end().build();
}
}/pre>
h3>第五节 Job的嵌套
/h3>
p>一个Job可以嵌套在另一个Job中,被嵌套的Job称为子Job,外部Job称为父Job。
子Job不能单独执行,需要由父Job来启动
/p>
p>案例:创建两个Job,作为子Job,再创建一个Job作为父Job
/p>
p>@Configuration
@EnableBatchProcessing
class NestedJobDemo5 {
/p>
pre class='language-javascript'>
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private Job childJobOne;
@Autowired
private Job childJobTwo;
@Autowired
private JobLauncher launcher;
@Bean
public Job parentJob(JobRepository repository,PlatformTransactionManager transactionManager)
{
return jobBuilderFactory.get("parentJob")
.start(childJob1(repository,transactionManager))
.next(childJob2(repository,transactionManager))
.build();
}
//返回的是Job类型的Step
private Step childJob2(JobRepository repository,PlatformTransactionManager transactionManager) {
return new JobStepBuilder(new StepBuilder("childJobTwo"))
.job(childJobTwo).launcher(launcher)
.repository(repository)
.transactionManager(transactionManager)
.build();
}
private Step childJob1(JobRepository repository,PlatformTransactionManager transactionManager) {
return new JobStepBuilder(new StepBuilder("childJobOne"))
.job(childJobOne).launcher(launcher)
.repository(repository)
.transactionManager(transactionManager)
.build();
}/pre>
p>}
/p>
p>spring.batch.job.names=parentJob
/p>
h3>第六节监听器的使用
/h3>
p>用来监听批处理作业的执行情况
/p>
p>创建监听可以通过实现接口或使用注解
/p>
p>JobExecutionListener(before,after)
/p>
p>StepExecutionListener(before,after)
/p>
p>ChunkListener(before,after,error)
/p>
p>ItemReadListener,ItemProcessListener,ItemWriteListener(before,after,error)
/p>
p>public class MyJobListener implements JobExecutionListener{
/p>
pre class='language-javascript'>
@Override
public void beforeJob(JobExecution jobExecution) {
// TODO Auto-generated method stub
System.out.println(jobExecution.getJobInstance().getJobName()+"before...");
}
@Override
public void afterJob(JobExecution jobExecution) {
// TODO Auto-generated method stub
System.out.println(jobExecution.getJobInstance().getJobName()+"after...");
}/pre>
p>}
/p>
p>public class MyChunkListener {
/p>
pre class='language-javascript'>
@BeforeChunk
public void beforeChunk(ChunkContext context){
System.out.println(context.getStepContext().getStepName()+"before...");
}
@AfterChunk
public void afterChunk(ChunkContext context){
System.out.println(context.getStepContext().getStepName()+"after...");
}/pre>
p>}
/p>
h3>第七节Job参数
/h3>
p>@Configuration
@EnableBatchProcessing
public class JobParametersDemo7 implements StepExecutionListener{
/p>
pre class='language-javascript'>
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
private Mapparameters;
@Bean
public Job parametersJob()
{
return jobBuilderFactory.get("parametersJob")
.start(parameterStep())
.build();
}
//在step执行之前接收到参数,因为会在step中用到,所以使用监听
@Bean
public Step parameterStep() {
return stepBuilderFactory.get("parameterStep")
.listener(this)
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution arg0, ChunkContext arg1) throws Exception {
System.out.println(parameters.get("name"));
return RepeatStatus.FINISHED;
}
}).build();
}
@Override
public void beforeStep(StepExecution stepExecution) {
parameters = stepExecution.getJobParameters().getParameters();
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return null;
}
}
第三章 数据输入
第一节 ItemReader概述
@Configuration
@EnableBatchProcessing
public class ItemReaderDemo8 {
@Autowired
private JobBuilderFactory jobBuilderFactroy;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job itemReaderJob()
{
return jobBuilderFactroy.get("itemReaderJob")
.start(itemReaderStep())
.build();
}
@Bean
public Step itemReaderStep() {
return stepBuilderFactory.get("itemReaderStep")
.chunk(2)
.reader(itemReaderDemoRead())
.writer(list->{
for(String item:list)
{
System.out.println(item+"...");
}
}).build();
}
@Bean
public MyReader itemReaderDemoRead() {
List data=Arrays.asList("cat","dog","pig","duck");
return new MyReader(data);
}
}
第二节 从数据库中读取数据
JdbcPagingItemReader
@Configuration
@EnableBatchProcessing
class DbJdbcDemo9 {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("dbJdbcWriter")
private ItemWriter super User> dbJdbcWriter;
@Autowired
private DataSource dataSource;
@Bean
public Job dbJdbcJob()
{
return jobBuilderFactory.get("dbJdbcJob")
.start(dbJdbcStep())
.build();
}
@Bean
public Step dbJdbcStep() {
return stepBuilderFactory.get("dbJdbcStep")
.chunk(2)
.reader(dbJdbcReader())
.writer(dbJdbcWriter)
.build();
}
@Bean
@StepScope
public JdbcPagingItemReader dbJdbcReader() {
JdbcPagingItemReader reader=new JdbcPagingItemReader<>();
//使用JdbcPagingItemReader对象从数据库中读取数据
reader.setDataSource(dataSource);
reader.setFetchSize(2);
reader.setRowMapper(new RowMapper() {
@Override
public User mapRow(ResultSet rs, int rowNum) throws SQLException {
User user=new User();
user.setId(rs.getInt(1));
user.setUsername(rs.getString(2));
user.setPassword(rs.getString(3));
user.setAge(rs.getInt(4));
return user;
}
});
//指定sql语句
MySqlPagingQueryProvider provider =new MySqlPagingQueryProvider();
provider.setSelectClause("id,username,password,age");
provider.setFromClause("from User");
Map sort=new HashMap<>(1);
sort.put("id", Order.ASCENDING);
provider.setSortKeys(sort);
reader.setQueryProvider(provider);
return reader;
}
}
第三节 从普通文件中读取数据
FlatFileItemReader
@Configuration
@EnableBatchProcessing
class FileDemo10 {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("fileWriter")
private ItemWriter super Customer> fileWriter;
@Bean
public Job fileJob()
{
return jobBuilderFactory.get("fileJob")
.start(fileStep())
.build();
}
@Bean
public Step fileStep() {
return stepBuilderFactory.get("fileStep")
.chunk(100)
.reader(fileReader())
.writer(fileWriter)
.build();
}
@Bean
@StepScope
public FlatFileItemReader fileReader() {
FlatFileItemReader reader = new FlatFileItemReader<>();
reader.setResource(new ClassPathResource("customer.csv"));
reader.setLinesToSkip(1);//跳过第一行
//如何解析数据
DelimitedLineTokenizer tokenizer=new DelimitedLineTokenizer();
//制定四个表头字段
tokenizer.setNames(new String[]{"id","firstName","lastName","birthday"});
//把一行映射为Customer对象
DefaultLineMapper mapper=new DefaultLineMapper<>();
mapper.setLineTokenizer(tokenizer);
mapper.setFieldSetMapper(new FieldSetMapper() {
@Override
public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
Customer customer =new Customer();
customer.setId(fieldSet.readLong("id"));
customer.setFirstName(fieldSet.readString("firstName"));
customer.setLastName(fieldSet.readString("lastName"));
customer.setBirthday(fieldSet.readString("birthday"));
return customer;
}
});
mapper.afterPropertiesSet();
reader.setLineMapper(mapper);
return reader;
}
第四节 从XML文件中读取数据
StaxEventItemReader
org.springframework spring-oxm com.thoughtworks.xstream xstream 1.4.7
@Configuration
@EnableBatchProcessing
public class XmlDemo11 {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("xmlWriter")
private ItemWriter super Customer> xmlWriter;
@Bean
public Job xmlJob()
{
return jobBuilderFactory.get("xmlJob")
.start(xmlStep())
.build();
}
@Bean
public Step xmlStep() {
return stepBuilderFactory.get("xmlStep")
.chunk(20)
.reader(xmlReader())
.writer(xmlWriter)
.build();
}
@Bean
@StepScope
public StaxEventItemReader xmlReader() {
StaxEventItemReader reader = new StaxEventItemReader<>();
reader.setResource(new ClassPathResource("customer.xml"));
//指定需要处理的根标签
reader.setFragmentRootElementName("customer");
//加入oxm的依赖,把xml转成对象
XStreamMarshaller unmarshaller=new XStreamMarshaller();
Map map = new HashMap<>();
map.put("customer", Customer.class);
unmarshaller.setAliases(map);
reader.setUnmarshaller(unmarshaller);
return reader;
}
第五节 从多个文件中读取数据
MultiResourceItemReader
@Configuration
@EnableBatchProcessing
class MultiFileDemo12 {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("multiFileWriter")
private ItemWriter super Customer> multiFileWriter;
@Value("classpath:/file*.csv")
private Resource[] fileResources;
@Bean
public Job multiFileJob()
{
return jobBuilderFactory.get("multiFileJob")
.start(multiFileStep())
.build();
}
@Bean
public Step multiFileStep() {
return stepBuilderFactory.get("multiFileStep")
.chunk(100)
.reader(multiFileReader())
.writer(multiFileWriter)
.build();
}
@Bean
@StepScope
public MultiResourceItemReader multiFileReader() {
MultiResourceItemReader reader =new MultiResourceItemReader();
reader.setDelegate(fileReader());
reader.setResources(fileResources);
return reader;
}
@Bean
@StepScope
public FlatFileItemReader fileReader() {
FlatFileItemReader reader = new FlatFileItemReader<>();
reader.setResource(new ClassPathResource("customer.csv"));
reader.setLinesToSkip(1);//跳过第一行
//如何解析数据
DelimitedLineTokenizer tokenizer=new DelimitedLineTokenizer();
//制定四个表头字段
tokenizer.setNames(new String[]{"id","firstName","lastName","birthday"});
//把一行映射为Customer对象
DefaultLineMapper mapper=new DefaultLineMapper<>();
mapper.setLineTokenizer(tokenizer);
mapper.setFieldSetMapper(new FieldSetMapper() {
@Override
public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
Customer customer =new Customer();
customer.setId(fieldSet.readLong("id"));
customer.setFirstName(fieldSet.readString("firstName"));
customer.setLastName(fieldSet.readString("lastName"));
customer.setBirthday(fieldSet.readString("birthday"));
return customer;
}
});
mapper.afterPropertiesSet();
reader.setLineMapper(mapper);
return reader;
}
第六节 ItemReader异常处理及重启
@Component(“restartReader”)
public class RestartReader implements ItemStreamReader{
private FlatFileItemReader customerFlatFileItemReader=new FlatFileItemReader<>();
private Long curLine = 0L;
private boolean restart = false;
private ExecutionContext executionContext;
public RestartReader()
{
customerFlatFileItemReader.setResource(new ClassPathResource("restartDemo.csv"));
customerFlatFileItemReader.setLinesToSkip(1);//跳过第一行
//如何解析数据
DelimitedLineTokenizer tokenizer=new DelimitedLineTokenizer();
//制定四个表头字段
tokenizer.setNames(new String[]{"id","firstName","lastName","birthday"});
//把一行映射为Customer对象
DefaultLineMapper mapper=new DefaultLineMapper<>();
mapper.setLineTokenizer(tokenizer);
mapper.setFieldSetMapper(new FieldSetMapper() {
@Override
public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
Customer customer =new Customer();
customer.setId(fieldSet.readLong("id"));
customer.setFirstName(fieldSet.readString("firstName"));
customer.setLastName(fieldSet.readString("lastName"));
customer.setBirthday(fieldSet.readString("birthday"));
return customer;
}
});
mapper.afterPropertiesSet();
customerFlatFileItemReader.setLineMapper(mapper);
}
@Override
public Customer read() throws Exception, UnexpectedInputException, ParseException,
NonTransientResourceException {
Customer customer = null;
this.curLine++;
if(restart){
customerFlatFileItemReader.setLinesToSkip(this.curLine.intValue()-1);
restart = false;
System.out.println("Start reading from line: " + this.curLine);
}
customerFlatFileItemReader.open(this.executionContext);
customer = customerFlatFileItemReader.read();
if (customer!=null && customer.getFirstName().equals("WrongName")) {
throw new RuntimeException("Something wrong. Customer id: " + customer.getId());
}
return customer;
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
this.executionContext = executionContext;
if (executionContext.containsKey("curLine")){
this.curLine = executionContext.getLong("curLine");
this.restart = true;
}else{
this.curLine = 0L;
executionContext.put("curLine",this.curLine);
System.out.println("Start reading from line: " + this.curLine + 1);
}
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
executionContext.put("curLine",this.curLine);
System.out.println("currentLine:"+this.curLine);
}
@Override
public void close() throws ItemStreamException {
}
}
第四章 数据输出
第一节 ItemWriter概述
ItemReader是一个数据一个数据的读,而
ItemWriter是一批一批的输出
@Component(“myWrite”)
public class MyWrite implements ItemWriter{
@Override
public void write(List extends String> items) throws Exception {
System.out.println(items.size());
for(String ss:items){
System.out.println(ss);
}
}
}
@Configuration
public class ItemWriterDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("myWrite")
private ItemWriter super String> myWrite;
@Bean
public Job itemWriterDemoJob()
{
return jobBuilderFactory.get("itemWriterDemoJob")
.start(itemWriterDemoStep())
.build();
}
@Bean
public Step itemWriterDemoStep() {
return stepBuilderFactory.get("itemWriterDemoStep")
.chunk(5)
.reader(myRead())
.writer(myWrite)
.build();
}
@Bean
public ItemReader myRead() {
List items=new ArrayList<>();
for(int i=1;i<=50;i++){
items.add("java"+i);
}
return new ListItemReader(items);
}
}
第二节 数据输出到数据库
Neo4jItemWriter
MongoItemWriter
RepositoryItemWriter
HibernateItemWriter
JdbcBatchItemWriter
JpaItemWriter
GemfireItemWriter
@Configuration
public class ItemWriterDbConfig {
@Autowired
private DataSource dataSource;
@Bean
public JdbcBatchItemWriter itemWriterDb()
{
JdbcBatchItemWriter writer=new JdbcBatchItemWriter();
writer.setDataSource(dataSource);
writer.setSql("insert into customer(id,firstName,lastName,birthday) values "+
"(:id,:firstName,:lastName,:birthday)");
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
return writer;
}
}
第三节 数据输出到普通文件
FlatFileItemWriter
案例:从数据库中读取数据写入到文件
@Configuration
public class DbFileWriterConfig {
@Bean
public FlatFileItemWriter dbFileWriter() throws Exception
{
FlatFileItemWriter writer=new FlatFileItemWriter();
String path = "e:\\customer.txt";
writer.setResource(new FileSystemResource(path));
//把一个 Customer对象转成一行字符串
writer.setLineAggregator(new LineAggregator() {
ObjectMapper mapper = new ObjectMapper();
@Override
public String aggregate(Customer item) {
String str = null;
try {
str=mapper.writeValueAsString(item);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return str;
}
});
writer.afterPropertiesSet();
return writer;
}
}
第四节 数据输出到xml文件
StaxEvenItemWriter
@Configuration
public class XmlFileWriterConfig {
@Bean
public StaxEventItemWriter xmlFileWriter() throws Exception
{
StaxEventItemWriter writer=new StaxEventItemWriter();
XStreamMarshaller marshaller = new XStreamMarshaller();
Map aliases = new HashMap<>();
aliases.put("customer", Customer.class);
marshaller.setAliases(aliases);
writer.setRootTagName("customers");
writer.setMarshaller(marshaller);
String path="e:\\cus.xml";
writer.setResource(new FileSystemResource(path));
writer.afterPropertiesSet();
return writer;
}
}
第五节 数据输出到多个文件
CompositeItemWriter
ClassifierCompositeItemWriter
@Bean
public CompositeItemWriter multiFileItemWriter() throws Exception
{
CompositeItemWriter writer=new CompositeItemWriter();
writer.setDelegates(Arrays.asList(jsonFileWriter(),xmlFileWriter()));
writer.afterPropertiesSet();
return writer;
}
//实现分类
@Bean
public ClassifierCompositeItemWriter multiFileItemWriter() throws Exception
{
ClassifierCompositeItemWriter writer=new ClassifierCompositeItemWriter();
writer.setClassifier(new Classifier>() {
@Override
public ItemWriter super Customer> classify(Customer customer) {
ItemWriter write=null;
try {
write=customer.getId()%2==0?jsonFileWriter():xmlFileWriter();
} catch (Exception e) {
e.printStackTrace();
}
return write;
}
});
return writer;
}
第六节 ItemProcessor的使用
ItemProcessor用于处理业务逻辑,验证,过滤等功能
CompositeItemProcessor
案例:从数据库中读取数据,然后对数据进行处理,最后输出到普通文件
@Bean
public CompositeItemProcessor process()
{
CompositeItemProcessor processor=new CompositeItemProcessor<>();
List> delegates=new ArrayList<>();
delegates.add(firstNameUpperProcessor);
delegates.add(idFilterProcessor);
processor.setDelegates(delegates);
return processor;
}
第五章 错误处理
第一节 错误处理概述
默认情况下当任务出现异常时,SpringBatch会
结束任务,当使相同的参数重启任务时,SpringBatch会去执行未执行的剩余任务.
@Bean
@StepScope
public Tasklet errorHandling() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
Map stepExecutionContext = chunkContext.getStepContext().getStepExecutionContext();
if(stepExecutionContext.containsKey("qianfeng")){
System.out.println("The second run will success");
return RepeatStatus.FINISHED;
}
else{
System.out.println("The first run will fail");
chunkContext.getStepContext().getStepExecutionContext().put("qianfeng", true);
throw new RuntimeException("error ...");
}
}
};
}
第二节 错误重试(Retry)
@Bean
public Step retryDemoStep() {
return stepBuilderFactory.get("retryDemoStep")
.chunk(10)
.reader(reader())
.processor(retryItemProcessor)
.writer(retryItemWriter)
.faultTolerant()
.retry(CustomRetryException.class)
.retryLimit(10)
.build();
}
第三节 错误跳过(Skip)
@Bean
public Step skipDemoStep() {
return stepBuilderFactory.get("skipDemoStep")
.chunk(10)
.reader(reader())
.processor(skipItemProcessor)
.writer(skipItemWriter)
.faultTolerant()
.skip(CustomRetryException.class)
.skipLimit(10)
.build();
}
第四节 错误跳过监听器(Skip Listener)
@Bean
public Step skipListenerDemoStep1() {
return stepBuilderFactory.get("skipListenerDemoStep1")
.chunk(10)
.reader(reader())
.processor(skipItemProcessor)
.writer(skipItemWriter)
.faultTolerant()
.skip(CustomRetryException.class)
.skipLimit(10)
.listener(mySkipListener)
.build();
}
@Component
public class MySkipListener implements SkipListener
@Override
public void onSkipInRead(Throwable t) {
}
@Override
public void onSkipInWrite(String item, Throwable t) {
}
@Override
public void onSkipInProcess(String item, Throwable t) {
System.out.println(item + " occur exception:" + t);
}
}
第六章 作业调度
第一节 JobLauncher的使用
控制任务什么时候启动
org.springframework.boot spring-boot-starter-web
程序启动时不执行任务:
spring.batch.job.enabled=false
@RestController
@RequestMapping(“/job”)
public class JobLauncherController {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job jobLauncherDemoJob;
@GetMapping("/{job1}")
public String job1Run(@PathVariable String job1) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException
{
System.out.println(job1);
JobParameters jobParameters=new JobParametersBuilder()
.addString("job1", job1)
.toJobParameters();
jobLauncher.run(jobLauncherDemoJob, jobParameters);
return "job1 success";
}
第二节 JobOperator的使用
@Configuration
public class JobOperatorDemo implements StepExecutionListener,ApplicationContextAware{
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
private Map parameter;
@Autowired
private JobLauncher jobLauncher;
@Autowired
private JobRepository jobRepository;
@Autowired
private JobExplorer jobExplorer;
@Autowired
private JobRegistry jobRegistry;
private ApplicationContext context;
@Bean
public JobRegistryBeanPostProcessor jobRegistrar() throws Exception {
JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
postProcessor.setJobRegistry(jobRegistry);
postProcessor.setBeanFactory(context.getAutowireCapableBeanFactory());
postProcessor.afterPropertiesSet();
return postProcessor;
}
@Bean
public JobOperator jobOperator(){
SimpleJobOperator operator = new SimpleJobOperator();
operator.setJobLauncher(jobLauncher);
operator.setJobParametersConverter(new DefaultJobParametersConverter());
operator.setJobRepository(jobRepository);
operator.setJobExplorer(jobExplorer);
operator.setJobRegistry(jobRegistry);
return operator;
}
@Bean
public Job jobOperatorDemoJob()
{
return jobBuilderFactory.get("jobOperatorDemoJob")
.start(jobOperatorDemoStep())
.build();
}
@Bean
public Step jobOperatorDemoStep() {
return stepBuilderFactory.get("jobOperatorDemoStep")
.listener(this)
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution arg0, ChunkContext arg1) throws Exception {
System.out.println(parameter.get("msg").getValue());
return RepeatStatus.FINISHED;
}
}).build();
}
@Override
public void beforeStep(StepExecution stepExcution) {
parameter = stepExcution.getJobParameters().getParameters();
}
@Override
public ExitStatus afterStep(StepExecution arg0) {
return null;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.context=applicationContext;
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/hz/148219.html