博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spring Batch 快速入门
阅读量:6071 次
发布时间:2019-06-20

本文共 9361 字,大约阅读时间需要 31 分钟。

hot3.png

  1. Spring Batch 是用来处理大量数据操作的框架,主要用来读取大量数据,然后进行一定处理后输出成指定的形式。
  2. Spring Batch 主要组成部分
  • JobRepository:用来注册Job的容器
  • JobLauncher:用来启动Job的接口
  • Job:实际执行的任务,包含一个或多个Step
  • Step:Step包含ItemReader,ItemProcessor,ItemWriter
  • ItemReader:读取数据的接口
  • ItemProcessor:处理数据的接口
  • ItemWriter:输出数据的接口

以上Spring Batch 主要组成部分只需要注册成Spring的Bean即可,在配置类上使用@EnableBatchProcession注解开启批处理支持

package com.springbatch.config;import com.springbatch.batch.CsvBeanValidator;import com.springbatch.batch.CsvItemProcessor;import com.springbatch.batch.CsvJobListener;import com.springbatch.entity.Person;import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.core.launch.support.RunIdIncrementer;import org.springframework.batch.core.launch.support.SimpleJobLauncher;import org.springframework.batch.core.repository.JobRepository;import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;import org.springframework.batch.item.ItemProcessor;import org.springframework.batch.item.ItemReader;import org.springframework.batch.item.ItemWriter;import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;import org.springframework.batch.item.database.JdbcBatchItemWriter;import org.springframework.batch.item.file.FlatFileItemReader;import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;import org.springframework.batch.item.file.mapping.DefaultLineMapper;import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;import org.springframework.batch.item.validator.Validator;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.ClassPathResource;import org.springframework.transaction.PlatformTransactionManager;import javax.sql.DataSource;/**Batch配置类 * Created by Administrator on 2016/12/8. */@Configuration//开启批处理支持@EnableBatchProcessingpublic class CsvBathConfig {    //ItemReader定义    @Bean    public ItemReader
reader() throws Exception { //使用FlatFileItemReader读取文件 FlatFileItemReader
reader = new FlatFileItemReader
(); //设置csv文件路径 reader.setResource(new ClassPathResource("people.csv")); //对csv文件的数据和领域模型类做对应映射 reader.setLineMapper(new DefaultLineMapper
(){
{ setLineTokenizer(new DelimitedLineTokenizer(){
{ setNames(new String[]{"name", "age", "nation", "address"}); }}); setFieldSetMapper(new BeanWrapperFieldSetMapper
(){
{ setTargetType(Person.class); }}); }}); return reader; } //ItemProcessor定义 @Bean public ItemProcessor
processor() { //采用自定义的ItemProcessor的实现 CsvItemProcessor processor = new CsvItemProcessor(); //指定自定义检验器 processor.setValidator(csvBeanValidator()); return processor; } //ItemWriter定义 @Bean public ItemWriter
writer(DataSource dataSource) {//自动注入dataSource //使用jdbc批处理的JdbcBatchItemWriter写数据到数据库 JdbcBatchItemWriter
writer = new JdbcBatchItemWriter
(); writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider
()); //要执行批处理的sql语句 String sql = "insert into person " + "(name,age,nation,address) " + "values (:name, :age, :nation, :address)"; writer.setSql(sql); writer.setDataSource(dataSource); return writer; } @Bean public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); jobRepositoryFactoryBean.setDataSource(dataSource); jobRepositoryFactoryBean.setTransactionManager(transactionManager); jobRepositoryFactoryBean.setDatabaseType("mysql"); return jobRepositoryFactoryBean.getObject(); } @Bean public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager)); return jobLauncher; } @Bean public Job importJob(JobBuilderFactory jobs, Step s1) { return jobs.get("importJob") .incrementer(new RunIdIncrementer()) .flow(s1) .end() .listener(csvJobListener()) .build(); } @Bean public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader
reader, ItemWriter
writer, ItemProcessor
processor) { return stepBuilderFactory .get("step1") .
chunk(65000)//每次提交65000条数据 .reader(reader) .processor(processor) .writer(writer) .build(); } @Bean public CsvJobListener csvJobListener() { return new CsvJobListener(); } @Bean public Validator
csvBeanValidator() { return new CsvBeanValidator
(); }}

3.Job监听

    用于监听Job的执行情况,需定义一个类实现JobExecutionListener,并在定义Job的Bean上绑定该监听器

定义:

public class CsvJobListener implements JobExecutionListener {    long startTime;    long endTime;    @Override    public void beforeJob(JobExecution jobExecution) {        startTime = System.currentTimeMillis();        System.out.println("任务处理开始");    }    @Override    public void afterJob(JobExecution jobExecution) {        endTime = System.currentTimeMillis();        System.out.println("任务处理结束");        System.out.println("耗时:" + (endTime - startTime) + "ms");    }}

注册:

@Beanpublic Job importJob(JobBuilderFactory jobs, Step s1) {    return jobs.get("importJob")            .incrementer(new RunIdIncrementer())            .flow(s1)            .end()            .listener(csvJobListener())            .build();}
@Beanpublic CsvJobListener csvJobListener() {    return new CsvJobListener();}

4.数据读取

    Spring Batch 提供了大量的ItemReader实现

5.数据处理和校验

  • 通过ItemProcessor接口实现来完成
/** * 数据处理 * Created by Administrator on 2016/12/8. */public class CsvItemProcessor extends ValidatingItemProcessor
{ @Override public Person process(Person item) throws ValidationException { super.process(item);//调用自定义校验器 //简单数据处理 if (item.getNation().equals("汉族")) { item.setNation("01"); } else { item.setNation("02"); } return item; }}

 

/** * 数据校验 * Created by Administrator on 2016/12/8. */public class CsvBeanValidator
implements Validator
, InitializingBean { private javax.validation.Validator validator; //Validator初始化 @Override public void afterPropertiesSet() throws Exception { ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory(); validator = validatorFactory.usingContext().getValidator(); } @Override public void validate(T value) throws ValidationException { //使用Validator的validate方法校验 Set
> constraintViolations = validator.validate(value); if (constraintViolations.size() > 0 ) { StringBuilder message = new StringBuilder(); for (ConstraintViolation
constraintViolation : constraintViolations) { message.append(constraintViolation.getMessage() + "\n"); } throw new ValidationException(message.toString()); } }}

定义ItemProcessor时把自定义校验器设置进去

//ItemProcessor定义@Bean    public ItemProcessor
processor() { //采用自定义的ItemProcessor的实现 CsvItemProcessor processor = new CsvItemProcessor(); //指定自定义检验器 processor.setValidator(csvBeanValidator()); return processor;}
@Beanpublic Validator
csvBeanValidator() { return new CsvBeanValidator
();}

6.数据输出,Spring Batch 提供了大量的ItemWriter的实现

7.计划任务

    只需在普通计划任务方法中只需JobLauncher的run方法即可

8.参数后置绑定

    可以在JobParameters中绑定参数,定义Bean时使用@StepScope注解,然后通过@Value注入此参数

参数设置:

@RequestMapping("/imp")public String imp(String fileName) throws Exception {    String path = fileName + ".csv";    jobParameters = new JobParametersBuilder()            .addLong("time", System.currentTimeMillis())            .addString("input.file.name", path)            .toJobParameters();    jobLauncher.run(importJob, jobParameters);    return "ok";}

定义Bean:

@Configuration@EnableBatchProcessingpublic class TriggerBatchConfig {    //ItemReader定义    @Bean    @StepScope    public FlatFileItemReader
reader(@Value("#{jobParameters['input.file.name']}") String pathToFile) throws Exception { //使用FlatFileItemReader读取文件 FlatFileItemReader
reader = new FlatFileItemReader
(); //设置csv文件路径 reader.setResource(new ClassPathResource(pathToFile)); //对csv文件的数据和领域模型类做对应映射 reader.setLineMapper(new DefaultLineMapper
(){
{ setLineTokenizer(new DelimitedLineTokenizer(){
{ setNames(new String[]{"name", "age", "nation", "address"}); }}); setFieldSetMapper(new BeanWrapperFieldSetMapper
(){
{ setTargetType(Person.class); }}); }}); return reader; }

源码:https://git.oschina.net/NeedLoser/Spring-Boot.git 的springbatch项目

转载于:https://my.oschina.net/NeedLoser/blog/803034

你可能感兴趣的文章
史上最长的电话报修
查看>>
python字符串格式化
查看>>
4月26日
查看>>
我的友情链接
查看>>
移动广告平台:KeyMob广告
查看>>
Android中常用的五种布局
查看>>
mongodb Explain and Index
查看>>
构建内网的MySQL的yum源
查看>>
Ansible之十一:变量详解
查看>>
LeetCode283. Move ZeroesC语言
查看>>
Loadrunner进行md5加密方法
查看>>
Essential Grid for ASP.NET MVC
查看>>
Mobiscroll 三级联动地区选择
查看>>
使用kubeadm部署k8s集群00-缓存gcr.io镜像
查看>>
策略模式Strategy (分离算法,选择实现)
查看>>
Server 2012私有云之高可用——”瑞友杯”虚拟化征文
查看>>
django新建支持中文mysql数据库
查看>>
html之marquee详解
查看>>
十个糟糕的程序员的行为
查看>>
《淘宝技术这十年》笔记 (大图,手机勿入)
查看>>