admin 管理员组文章数量: 887063
2024年1月16日发(作者:常见的ascii码表)
Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!
Spring Batch
作 者:孤旅者
Blog:/gulvzhe
整理者:大海
声 明:此系列博文出自博客园,版权归原作者和博客园所有,转载请注明出处。只能用于学习之用。任何版权问题整理者概不负责!
Spring Batch系列总括
最近一个项目在使用SpringBatch框架做一个电子商务平台的批处理。网上资料很有限,尤其是中文资料更是少之又少,官网上的文档也只是讲一些入门的基础知识,大部分高级特性都是一笔带过,讲解的很不彻底,在实际开发中碰到的问题很多。因此,特将自己学习、应用Spring Batch的过程总结成一个个小实例写成随笔。一是备忘,二是抛砖引玉,希望更多的高手能参与进来,指出其中的不足和提出自己的见解,大家共通讨论学习。
写过的关于SpringBatch的随笔主要有以下几篇:
Spring Batch 之 Spring Batch 简介(一)
Spring Batch 之 框架流程简单介绍(二)
Spring Batch 之 Sample(Hello World)(三)
Spring Batch 之 Sample(CSV文件操作)(四)
Spring Batch 之 Sample(XML文件操作)(五)
Spring Batch 之 Sample(固定长格式文件读写)(六)
Spring Batch 之 Sample(复合格式文件的读、多文件的写)(七)
Spring Batch 之 Sample(游标方式读写DB数据表)(八)
Spring Batch 之 skip讲解(九)
Spring Batch 之 JobParameters(十)
后续关于Spring Batch的随笔继续更新中
Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!
Spring Batch 之 Spring Batch 简介(一)
Spring Batch是一个轻量级的,完全面向Spring的批处理框架,可以应用于企业级大量的数据处理系统。Spring
Batch以POJO和大家熟知的Spring框架为基础,使开发者更容易的访问和利用企业级服务。Spring Batch可以提供大量的,可重复的数据处理功能,包括日志记录/跟踪,事务管理,作业处理统计工作重新启动、跳过,和资源管理等重要功能。
业务方案:
1、批处理定期提交。
2、并行批处理:并行处理工作。
3、企业消息驱动处理
4、大规模的并行处理
5、手动或是有计划的重启
6、局部处理:跳过记录(如:回滚)
技术目标:
1、利用Spring编程模型:使程序员专注于业务处理,让Spring框架管理流程。
2、明确分离批处理的执行环境和应用。
3、提供核心的,共通的接口。
4、提供开箱即用(out of the box)的简单的默认的核心执行接口。
5、提供Spring框架中配置、自定义、和扩展服务。
6、所有存在的核心服务可以很容的被替换和扩展,不影响基础层。
7、提供一个简单的部署模式,利用Maven构建独立的Jar文件。
Spring Batch的结构:
这种分层结构有三个重要的组成部分:应用层、核心层、基础架构层。应用层包含所有的批处理作业,通过Spring框架管理程序员自定义的代码。核心层包含了Batch启动和控制所需要的核心类,如:JobLauncher、Job和step等。应用层和核心层建立在基础构架层之上,基础构架层提供共通的读(ItemReader)、写(ItemWriter)、和服务(如RetryTemplate:重试模块。可以被应用层和核心层使用)。
Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!
Spring Batch 之 框架流程简单介绍(二)
Spring Batch流程介绍:
上图描绘了Spring Batch的执行过程。说明如下:
每个Batch都会包含一个Job。Job就像一个容器,这个容器里装了若干Step,Batch中实际干活的也就是这些Step,至于Step干什么活,无外乎读取数据,处理数据,然后将这些数据存储起来(ItemReader用来读取数据,ItemProcessor用来处理数据,ItemWriter用来写数据) 。JobLauncher用来启动Job,JobRepository是上述处理提供的一种持久化机制,它为JobLauncher,Job,和Step实例提供CRUD操作。
外部控制器调用JobLauncher启动一个Job,Job调用自己的Step去实现对数据的操作,Step处理完成后,再将处理结果一步步返回给上一层,这就是Batch处理实现的一个简单流程。
Step执行过程:
从DB或是文件中取出数据的时候,read()操作每次只读取一条记录,之后将读取的这条数据传递给processor(item)处理,框架将重复做这两步操作,直到读取记录的件数达到batch配置信息中”commin-interval”设定值的时候,就会调用一次write操作。然后再重复上图的处理,直到处理完所有的数据。当这个Step的工作完成以后,或是跳到其他Step,或是结束处理。
Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!
这就是一个SpringBatch的基本工作流程。
下次,将通过“Hello World”实例,与大家共同探讨SpringBatch的具体应用和实现。
Spring Batch 之 Sample(Hello World)(三)
通过前面两篇关于Spring Batch文章的介绍,大家应该已经对Spring Batch有个初步的概念了。这篇文章,将通过一个”Hello World!”实例,和大家一起探讨关于Spring Batch的一些基本配置和实现。使大家从开发的角度对Spring Batch有一个真切的体会。
说明:1,本实例使用的是spring-batch 2.1.8
2,本实例没有像前面讲的那样配置ItemReader、ItemProcessor和ItemWriter,而是之间在Step中调用Tasklet,由Tasklet完成”Hello World!”的输出。
工程结构如下图:
类用来启动Bath,用来完成输出工作。用来配置一些Spring信息,配置Job信息。
文件配置如下:
xmlns:xsi="/2001/XMLSchema-instance" xmlns:p="/schema/p" xmlns:tx="/schema/tx" xmlns:aop="/schema/aop" xmlns:context="/schema/context" xsi:schemaLocation="/schema/beans /schema/beans/ /schema/tx /schema/tx/ /schema/aop /schema/aop/ /schema/context /schema/context/" default-autowire="byName"> toryFactoryBean"> class="celessTransactionManager"/>
jobLauncher负责batch的启动工作,jobRepository负责job的整个运行过程中的CRUD操作,transactionManager负责事务的管理操作。
文件配置如下:
xmlns:bean="/schema/beans" xmlns:xsi="/2001/XMLSchema-instance" xmlns:p="/schema/p" xmlns:tx="/schema/tx" xmlns:aop="/schema/aop" xmlns:context="/schema/context" xsi:schemaLocation="/schema/beans /schema/beans/ /schema/tx /schema/tx/ /schema/aop /schema/aop/ /schema/context /schema/context/ /schema/batch /schema/batch/"> Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!
配置了一个ID为helloWorldJob的job,此job有两个Step : step_hello和step_world,前者负责输出“Hello ”,后者负责输出“World!”,当第一个Step完成以后,执行第二个Step。
writeTasklet类的代码如下:
public class writeTasklet implements Tasklet {
/** Message */
private String message;
/**
* @param message
* the message to set
*/
public void setMessage(String message) {
e = message;
}
@Override
public RepeatStatus execute(StepContribution arg0, ChunkContext arg1)
throws Exception {
n(message);
return ED;
}
}
此类中定义了一个message属性,通过的“hello”和“world” Bean为其注入值。 execute方法,是由Tasklet接口继承而来的,是Tasklet实现业务逻辑的地方。此实例中只是简单的输出Message信息后,直接返回。
启动类JobLaunch类的代码如下:
public class JobLaunch {
/**
* @param args
*/
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext(
"");
JobLauncher launcher = (JobLauncher) n("jobLauncher");
Job job = (Job) n("helloWorldJob");
try {
/* 运行Job */
JobExecution result = (job, new JobParameters());
/* 处理结束,控制台打印处理结果 */
n(ng());
Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!
} catch (Exception e) {
tackTrace();
}
}
}
本例通过Spring配置的方式取得JobLauncher和Job对象,然后由JobLauncher的run方法启动job,参数JobParameters是标志job的一些参数,处理结束后,控制台输出处理结果。
上面就是通过SpringBatch运行一个"Hello World”程序所需要的基本配置。由于其优势是处理大批量的数据,所以仅仅为了输出"Hello World"而编写这么多代码和配置文件,确实显得有些笨拙,也体现不出其优越性。
下次,将通过读取一个CSV文件,经过简单的处理,再写入另外一个CSV文件的实例,与大家共同探讨SpringBatch的应用。
Spring Batch 之 Sample(CSV文件操作)(四)
本文将通过一个完整的实例,与大家一起讨论运用Spring Batch对CSV文件的读写操作。此实例的流程是:读取一个含有四个字段的CSV文件(ID,Name,Age,Score),对读取的字段做简单的处理,然后输出到另外一个CSV文件中。
工程结构如下图:
JobLaunch类用来启动Job, CsvItemProcessor类用来对Reader取得的数据进行处理, Student类是一个POJO类,用来存放映射的数据。 是数据读取文件, 是数据输出文件。
文件配置如前篇文章,不再赘述。
文件中Job配置如下:
这个文件里配置了这次运行的JOB:csvJob。本Job包含一个Step,完成一个完整的CSV文件读写功能。分别由 csvItemReader完成CSV文件的读操作,由 csvItemProcessor完成对取得数据的处理,由
csvItemWriter完成对CSV文件的写操作。
Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!
文件中csvItemReader配置如下:
class="leItemReader" scope="step"> class="tLineMapper"> class="apperFieldSetMapper">
csvItemReader实现的是Spring Batch提供FlatFileItemReader类,此类主要用于Flat文件的读操作。它包含两个必要的属性 resource和 lineMapper。前者指定要读取的文件的位置,后者是将文件的每一行映射成一个Pojo对象。其中 lineMapper也有两个重要属性 lineTokenizer和 fieldSetMapper, lineTokenizer将文件的一行分解成一个 FieldSet,然后由 fieldSetMapper映射成Pojo对象。
这种方式与DB的读操作非常类似。lineMapper类似于ResultSet,文件中的一行类似于Table中的一条记录,被封装成的FieldSet,类似于RowMapper。至于怎么将一条记录封装,这个工作由lineTokenizer的继承类DelimitedLineTokenizer完成。DelimitedLineTokenizer的delimiter属性决定文件的一行数据按照什么分解,默认的是“,”, names属性标示分解的每个字段的名字,传给fieldSetMapper(本实例用的是BeanWrapperFieldSetMapper)的时候,就可以按照这个名字取得相应的值。fieldSetMapper的属性prototypeBeanName,是映射Pojo类的名字。设置了此属性后,框架就会将lineTokenizer分解成的一个
Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!
FieldSet映射成Pojo对象,映射是按照名字来完成的(lineTokenizer分解时标注的名字与Pojo对象中字段的名字对应)。
总之,FlatFileItemReader读取一条记录由以下四步完成:1,从resource指定的文件中读取一条记录;2,lineTokenizer将这条记录按照delimiter分解成Fileset,每个字段的名字由names属性取得;3,将分解成的Fileset传递给fieldSetMapper,由其按照名字映射成Pojo对象;4,最终由FlatFileItemReader将映射成的Pojo对象返回,框架将返回的对象传递给Processor。
csvItemProcessor实现的是ItemProcessor类。此类接受Reader映射成的Pojo对象,可以对此对象做相应的业务逻辑处理,然后返回,框架就会将返回的结果传递给Writer进行写操作。具体实现代码如下:
package ;
import ocessor;
import ent;
/**
* ItemProcessor类。
*/
@Component("csvItemProcessor")
public class CsvItemProcessor implements ItemProcessor
/**
* 对取到的数据进行简单的处理。
*
* @param student
* 处理前的数据。
* @return 处理后的数据。
* @exception Exception
* 处理是发生的任何异常。
*/
@Override
public Student process(Student student) throws Exception {
/* 合并ID和名字 */
e(() + "--" + e());
/* 年龄加2 */
(() + 2);
/* 分数加10 */
re(re() + 10);
/* 将处理后的结果传递给writer */
return student;
}
}
文件中csvItemWriter配置如下:
class="leItemWriter" scope="step"> Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register! class="tedLineAggregator"> class="apperFieldExtractor">
csvItemWriter实现的是FlatFileItemWriter类。此类与FlatFileItemReader类相似,也有两个重要的属性:resource和lineAggregator。前者是要输出的文件的路径,后者和lineTokenizer类似。lineAggregator(本实例用DelimitedLineAggregator类)也有两个重要的属性:delimiter和fieldExtractor。Delimiter标示输出的字段以什么分割,后者将Pojo对象组装成由Pojo对象的字段组成的一个字符串。同样FlatFileItemWriter写一条记录也有以下四步完成:1,Processor传递过来一个对象给lineAggregator;2,lineAggregator将其这个对象转化成一个数组;3,再由lineAggregator的属性fieldExtractor将数组转化成按照delimiter分割一个字符串;4,将这个字符串输出。
这样,一条数据的读、处理、写操作就基本完成了。当然,读和写也可以自己写类来处理,只是要注意继承FlatFileItemReader和FlatFileItemWriter就可以了。
实例中用到的Student类代码如下:
package ;
/** Pojo类_Student */
public class Student {
/** ID */
private String ID = "";
/** 名字 */
private String name = "";
/** 年龄 */
private int age = 0;
/** 分数 */
private float score = 0;
/*getter 和setter已删除*/
}
实例中用到的输入数据如下:
实例输出结果如下:
Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!
本文的配置要注意以下两点:
1, 注意Writer的resource要写成“file:******”形式,不能用“classpath:******”形式。
2, 如果将Job配置中commit-interval属性配置为大于1时,每次commit的都是最后一条记录,前面读取的被覆盖了。具体原因不明,如果将Reader的fieldSetMapper属性自己重写,就可以解决这个问题。(注:student bean添加scope属性可以解决此问题:scope:"prototype".2011/12/16)
下次,将和大家一起讨论关于XML文件的读写问题。
Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!
Spring Batch 之 Sample(XML文件操作)(五)
前篇关于Spring Batch的文章,讲述了Spring Batch 对CSV文件的读写操作。 本文将通过一个完整的实例,与大家一起讨论运用Spring Batch对XML文件的读写操作。实例流程是从一个XML文件中读取商品信息,经过简单的处理,写入另外一个XML文件中。
工程结构如下图:
是log处理的配置文件,与本文没有必然联系,再此不做论述。
文件内容如下:
1
2 3 xmlns:xsi="/2001/XMLSchema-instance" xmlns:p="/schema/p" 4 xmlns:tx="/schema/tx" xmlns:aop="/schema/aop" 5 xmlns:context="/schema/context" 6 xsi:schemaLocation="/schema/beans 7 /schema/beans/ 8 /schema/tx 9 /schema/tx/ 10 /schema/aop 11 /schema/aop/ 12 /schema/context 13 /schema/context/" 14 default-autowire="byName"> 15 16 17 18 19 20 class="JobLauncher"> 21 22 23 24 25 26 27 class="celessTransactionManager"> Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register! 28 29
复制代码
17行是base-spckage的指定,是spring的用法。
19-22行配置的jobLauncher用来启动Job。
24行配置的jobRepository为job提供持久化操作。
26-28行的transactionManager提供事物管理操作。
本文核心配置文件内容如下:
1
2 3 xmlns:bean="/schema/beans" xmlns:xsi="/2001/XMLSchema-instance" 4 xmlns:p="/schema/p" xmlns:tx="/schema/tx" 5 xmlns:aop="/schema/aop" xmlns:context="/schema/context" 6 xmlns:util="/schema/util" 7 xsi:schemaLocation="/schema/beans 8 /schema/beans/ 9 /schema/tx 10 /schema/tx/ 11 /schema/aop 12 /schema/aop/ 13 /schema/context 14 /schema/context/ 15 /schema/batch 16 /schema/batch/ 17 /schema/util /schema/util/"> 18 19 20 21 22 23 24 25 commit-interval="10"> 26 27 28 29 30 31 32 33 class="entItemReader" scope="step"> 34 35 36 Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register! 37 value="file:#{jobParameters['inputFilePath']}"> 38 39 40 41 42 class="entItemWriter" scope="step"> 43 44 45 46 value="file:#{jobParameters['outputFilePath']}" /> 47 48 49 50 class="mMarshaller"> 51 52 53 54 value="" /> 55 56 57 58 59
复制代码
21-29行配置了Job的基本信息。此Job包含一个Step,Step中包含了基本的读(xmlReader),处理(xmlProcessor),写(xmlWriter)。
32-38行配置了对XML文件读操作。对XML的读是由SpringBatch提供的StaxEventItemReader类来完成。要读取一个XML文件,首先要知道这个文件的存放路径,resource属性就是指定文件路径信息的。知道了文件路径,还需要知道要读取的XML的根节点名称,fragmentRootElementName属性就是指定根节点名称的。知道了根节点名称,还需要知道的一点就是怎么解析这个节点信息,unmarshaller就负责完成解析节点信息,并映射成程序pojo对象。注意,根节点并不是指整个XML文件的根节点,而是指要读取的信息片段的根节点,不管这个节点片段处在哪一层,框架都会遍历到。
49-58行配置了解析XML节点信息的unmarshaller。其中entry的key指定对应根节点名称goods,value指定程序的pojo类,这样,程序就可以将goods节点下的子节点与pojo类(Goods)中的属性去匹配,当匹配到子节点名与pojo类中的属性名相同时,就会将子节点的内容赋值给pojo类的属性。这样就完成了一个根节点的读取,框架会控制循环操作,直到将文件中所有根(goods)节点全部读完为止。这样就完成了XML文件的读操作。
41-47行配置了对XML文件的写操作。与读XML文件一样,要写一个XML文件,也是需要知道这个文件的文件的存放路径的,同样是resource属性提供文件的路径信息。同时,也是需要知道这个文件的跟节点信息的,rootTagName属性提供根节点名信息。注意此处的根节点,指整个文件的跟节点,与读得时候稍有区别,从两个属性的名称上也可以看出。有了上面的信息,完成一个写操作,还需要一个把pojo对象转换成XML片段的工具,由marshaller提供。本文读操作的unmarshaller和写操作的marshaller用的是同一个转换器,因为XStreamMarshaller既提供将节点片段转换为pojo对象功能,同时又提供将pojo对象持久化为xml文件的功能。如果写的内容与读得内容有很大差异,可以另外配置一个转换器。
文件配置的对XML文件的读写操作,至于读出来的信息做怎么样的处理再写入文件,通过简单的配置恐怕就无法完成了,就需要我们自己写代码完成了。XMLProcessor类就是完成这个工作的。只要在Job的配置文件中指定到这个类就可以了。XMLProcessor类的内容如下:
package ;
Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!
import ;
import ocessor;
import ent;
import ;
/**
* XML文件处理类。
*/
@Component("xmlProcessor")
public class XMLProcessor implements ItemProcessor
/**
* XML文件内容处理。
*
*/
@Override
public Goods process(Goods goods) throws Exception {
// 购入日期变更
Day(new Date());
// 顾客信息变更
tomer(tomer() + "顾客!");
// ISIN变更
n(n() + "IsIn");
// 价格变更
ce(ce() + 1000.112);
// 数量变更
ntity(ntity() + 100);
// 处理后的数据返回
return goods;
}
}
内容很简单,再此就不做赘述了。要注意的一点就是红背景色的地方。加了此标签无须在文件增加对xmlProcessor声明用的bean,可以在job中直接引用,这是Spring的功能。当然,实现这个的前提是要在中配置base-package,只有这样才能找到。
工程结构图中的XMLLaunch类用来启动Job。内容如下:
package ;
import ;
import cution;
import ametersBuilder;
import ncher;
import ationContext;
import athXmlApplicationContext;
Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!
public class XMLLaunch {
/**
* @param args
*/
public static void main(String[] args1) {
ApplicationContext context = new ClassPathXmlApplicationContext(
"");
JobLauncher launcher = (JobLauncher) n("jobLauncher");
Job job = (Job) n("xmlFileReadAndWriterJob");
try {
// JOB实行
JobExecution result = (job, new JobParametersBuilder()
.addString("inputFilePath", "C:")
.addString("outputFilePath", "C:")
.toJobParameters());
// 运行结果输出
n(ng());
} catch (Exception e) {
tackTrace();
}
}
}
注意其中为Job提供的两个动态参数,以及在配置文件中的用法。
pojo类Goods的内容如下:
package ;
import rmat;
import DateFormat;
import ;
/**
* 商品信息类.
*/
public class Goods {
/** isin号 */
private String isin;
/** 数量 */
private int quantity;
/** 价格 */
private double price;
/** 客户 */
private String customer;
Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!
/** 购入日期 */
private Date buyDay;
/* getter he setter已经删除 */
}
文件内容如下:
处理结果如下():
下次,将和大家一起讨论关于Spring Batch 对固定长内容文件的读写问题。
Spring Batch 之 Sample(固定长格式文件读写)(六)
前篇关于Spring Batch的文章,讲述了Spring Batch 对XML文件的读写操作。 本文将通过一个完整的实例,与大家一起讨论运用Spring Batch对固定长格式文件的读写操作。实例延续前面的例子,读取一个含有四个字段的TXT文件(ID,Name,Age,Score),对读取的字段做简单的处理,然后输出到另外一个TXT文件中。
工程结构如下图:
和前文已经叙述过,在此不做赘述。
本文核心配置文件内容如下:
1
2 3 xmlns:bean="/schema/beans" xmlns:xsi="/2001/XMLSchema-in Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register! stance" 4 xmlns:p="/schema/p" xmlns:tx="/schema/tx" 5 xmlns:aop="/schema/aop" xmlns:context="/schema/context" 6 xmlns:util="/schema/util" 7 xsi:schemaLocation="/schema/beans 8 /schema/beans/ 9 /schema/tx 10 /schema/tx/ 11 /schema/aop 12 /schema/aop/ 13 /schema/context 14 /schema/context/ 15 /schema/batch 16 /schema/batch/ 17 /schema/util /schema/util/"> 18 19 20 21 22 23 24 25 26 processor="fixedLengthProcessor" commit-interval="10"> 27 28 29 30 31 32 33 34 class="leItemReader" scope="step"> 35 36 value="file:#{jobParameters['inputFilePath']}" /> 37 38 39 class="tLineMapper"> 40 41 42 43 class="apperFieldSetMapper"> 44 45 46 47 48 Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register! 49 50 51 class="tPojo" scope="prototype" /> 52 53 class="engthTokenizer"> 54 55 56 57 58 59 60 class="leItemWriter" scope="step"> 61 62 value="file:#{jobParameters['outputFilePath']}" /> 63 64 65 class="terLineAggregator"> 66 67 68 class="apperFieldExtractor"> 69 70 71 72 73 74 75 76
复制代码
22-30行配置了Job的基本信息。此Job包含一个Step,Step中包含了基本的读(fixedLengthReader),处理(fixedLengthProcessor),写(fixedLengthWriter)以及commit件数(commit-interval)。
33-49行配置了读处理的详细信息。固定长格式和csv格式都属于flat文件格式,所以读取固定长格式文件也是需要使用Spring Batch提供的核心类FlatFileItemReader。对此类的配置在《Spring Batch 之 Sample(CSV文件操作)(四) 》中已经做过详细说明。但要注意lineTokenizer的配置,在读取CSV文件的时候,使用的是DelimitedLineTokenizer类,但是读取固定长格式的文件,需要使用FixedLengthTokenizer,如52-56行所示。其columns是如何分割一条记录信息,也就是说指定哪几列属于一个项目的信息(注意:列数的总长度与文件记录长度不一样的时候,会报错。注意限定范围)。属性names指定每个项目的名字。其名字与44行prototypeBeanName属性指定的Pojo属性名相同。
59-76行配置了写处理的详细信息。写固定长格式的文件,与写CSV格式的文件一样,也是使用Spring Batch提供的核心类FlatFileItemWriter。在此也不再赘述。但要注意lineAggregator属性使用的是FormatterLineAggregator类,此类的format属性可以指定每个项目所占的长度和格式。
文件配置了对固定长文件的和写。在读之后,写之前的处理,是通过自定的FixedLengthProcessor 类处理的。详细代码如下:
package ength;
import ocessor;
import ent;
Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!
/**
* 业务处理类。
*
* @author Wanggc
*/
@Component("fixedLengthProcessor")
public class FixedLengthProcessor implements
ItemProcessor
/**
* 对取到的数据进行简单的处理。
*
* @param student
* 处理前的数据。
* @return 处理后的数据。
* @exception Exception
* 处理是发生的任何异常。
*/
public StudentPojo process(StudentPojo student) throws Exception {
/* 合并ID和名字 */
e(() + "--" + e());
/* 年龄加2 */
(() + 2);
/* 分数加10 */
re(re() + 10);
/* 将处理后的结果传递给writer */
return student;
}
}
至此,对固定长格式文件的读、处理、写操作已经介绍完毕。下面是一些辅助文件的信息。
Pojo类StudentPojo的详细代码如下:
package ength;
/** Pojo类_Student */
public class StudentPojo {
/** ID */
private String ID = "";
/** 名字 */
private String name = "";
/** 年龄 */
private int age = 0;
/** 分数 */
private float score = 0;
Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!
/* 为节省篇幅,getter 和 setter 已经删除 */
}
Job启动类Launch的详细代码如下:
package ength;
import ;
import cution;
import ametersBuilder;
import ncher;
import ationContext;
import athXmlApplicationContext;
public class Launch {
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext(
"");
JobLauncher launcher = (JobLauncher) n("jobLauncher");
Job job = (Job) n("fixedLengthJob");
try {
// JOB实行
JobExecution result = (
job,
new JobParametersBuilder()
.addString("inputFilePath",
"C:")
.addString("outputFilePath",
"C:")
.toJobParameters());
// 运行结果输出
n(ng());
} catch (Exception e) {
tackTrace();
}
}
}
input文件内容如下:
处理结果如下:
Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!
下次,将和大家一起讨论关于Spring Batch 对复合格式文件的读写问题。
Spring Batch 之 Sample(复合格式文件的读、多文件的写)(七)
前面关于Spring Batch的文章,讲述了SpringBatch对CSV文件的读写操作、对XML文件的操作,以及对固定长格式文件的操作。这些事例,同一个Reader读取的都是相同格式的数据,最终写入一个文件。如果遇到下面这样的数据,并想将学生信息和商品信息分类后写入两个文件,应该如何处理呢?
student,200001,ZhangSan,18,78
goodsPNH.1zhangshana2011/12/18 01:12:36
student,200002,LiSi,19,79
goodsPNH.1zhangshanb2011/12/19 01:12:36
student,200003,WangWu,20,80
goodsPNH.1zhangshanc2011/12/20 01:12:36
* 以student开头的数据代表学生信息,以goods开头代表商品信息
这次将和大家一起探讨Spring Batch读取复合格式的数据,然后写入不同的文件的处理方式。
工程结构如下图:
和前文已经叙述过,在此不做赘述。
本实例的核心配置文件内容如下:
1
2 Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register! 3 xmlns:bean="/schema/beans" xmlns:xsi="/2001/XMLSchema-instance" 4 xmlns:p="/schema/p" xmlns:tx="/schema/tx" 5 xmlns:aop="/schema/aop" xmlns:context="/schema/context" 6 xmlns:util="/schema/util" 7 xsi:schemaLocation="/schema/beans 8 /schema/beans/ 9 /schema/tx 10 /schema/tx/ 11 /schema/aop 12 /schema/aop/ 13 /schema/context 14 /schema/context/ 15 /schema/batch 16 /schema/batch/ 17 /schema/util /schema/util/"> 18 19 20 21 22 23 24 25 commit-interval="1"> 26 27 28 29 30 31 32 33 34 35 36 37 class="leItemReader" scope="step"> 38 39 value="file:#{jobParameters['inputFilePath']}" /> 40 41 42 class="nMatchingCompositeLineMapper"> 43 44 45 46 47 Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register! 48 49 50 51 52 53 54 55 56 57 58 59 class="tedLineTokenizer"> 60 61 62 63 64 65 66 67 68 69 70 71 72 class="apperFieldSetMapper"> 73 74 75 76 77 78 class="t" 79 scope="prototype" /> 80 81 82 class="engthTokenizer"> 83 84 85 value="isin,quantity,price,customer,buyDay" /> 86 87 88 class="apperFieldSetMapper"> 89 90 91 92 93 class="" 94 scope="prototype" /> Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register! 95 96 97 class="temWriter"> 98 99 100 101 102 103 104 105 106 107 class="leItemWriter" scope="step"> 108 109 value="file:#{jobParameters['outputFilePathStudent']}" /> 110 111 112 class="terLineAggregator"> 113 114 115 class="apperFieldExtractor"> 116 117 118 119 120 121 122 123 124 125 class="leItemWriter" scope="step"> 126 127 value="file:#{jobParameters['outputFilePathGoods']}" /> 128 129 130 class="tedLineAggregator"> 131 132 133 class="apperFieldExtractor"> 134 135 value="isin,quantity,price,customer,buyDay" /> 136 137 138 139 140 141
Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!
复制代码
21-33行配置了Job的基本信息。
36-57行配置了Reader的基本信息。FlatFileItemReader的lineMapper属性使用SpringBatch核心类PatternMatchingCompositeLineMapper的时候,会将读取的记录按照不同的方式映射成我们的Pojo对象。当然首先我们要配置不同的tokenizers(43-48)和fieldSetMappers(49-54),并告诉它当前的记录按照那条原则去解析和映射。如45行所示,我们指定key为student*的时候,用studentTokenizer去解析成fieldset,用studentFieldSetMapper将studentTokenizer解析好的fieldset记录映射成Student对象。我们指定的key,其实也就是student开头的记录,*是通配符。PatternMatchingCompositeLineMapper支持两种通配符:*和?,前者代表多个字符,后者仅代表一个字符。至于student和goods信息如何映射成pojo对象,前面的文章中已经做过详细的介绍,这里就不做赘述了。
96-104行配置了Writer的基本信息。Writer也是使用代理的方式,学生信息使用106-122行定义的studentWriter按照固定长的格式写入学生信息文件中,商品信息使用124-141行定义的goodsWriter按照CSV的格式写入商品信息文件中。MultiItemWriter的代码很简单,就不做详细解释了。如下:
package ypessinglefile;
import ist;
import ;
import iter;
import ;
import t;
/**
* 写处理类。
*
* @author Wanggc
*
* @param
*/
@SuppressWarnings("unchecked")
public class MultiItemWriter
/** 写代理 */
private List
public void setDelegates(List
tes = delegates;
}
@Override
public void write(List extends T> items) throws Exception {
// 学生信息的Writer
ItemWriter studentWriter = (ItemWriter) (0);
// 商品信息的Writer
ItemWriter goodsWriter = (ItemWriter) (1);
// 学生信息
List
Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!
// 商品信息
List
// 将传过来的信息按照不同的类型添加到不同的List中
for (int i = 0; i < (); i++) {
if ("Student".equals((i).getClass().getSimpleName())) {
((Student) (i));
} else {
((Goods) (i));
}
}
// 如果学生List中有数据,就执行学生信息的写
if (() > 0) {
(studentList);
}
// 如果商品List中有数据,就执行商品信息的写
if (() > 0) {
(goodsList);
}
}
}
至此,复合文件的读写操作已经讨论结束了。注意实例没有配置Processor。下面是一些辅助文件的信息。
student和goods类的信息与前面文章一样,就不再贴出代码了。
Job启动的代码如下:
package ypessinglefile;
import ;
import cution;
import ametersBuilder;
import ncher;
import ationContext;
import athXmlApplicationContext;
public class Launch {
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext(
"");
JobLauncher launcher = (JobLauncher) n("jobLauncher");
Job job = (Job) n("multiTypeSingleFileJob");
try {
// JOB实行
JobExecution result = (
job,
new JobParametersBuilder()
.addString("inputFilePath",
"C:")
Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!
.addString("outputFilePathStudent",
"C:")
.addString("outputFilePathGoods",
"C:")
.toJobParameters());
// 运行结果输出
n(ng());
} catch (Exception e) {
tackTrace();
}
}
}
Input文件内容如下图:
处理结果的学生信息文件如下图:
处理结果的商品信息文件如下图:
Spring Batch对复合格式文件的读写操作就讨论到这里。至此,Spring Batch对文件简单操作的讨论也告一段落,下次将讨论Spring Batch读写DB的操作。
Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!
Spring Batch 之 Sample(游标方式读写DB数据表)(八)
前面关于Spring Batch的文章,讲述了SpringBatch对Flat、XML等文件的读写操作,本文将和大家一起讨论Spring
Batch对DB的读写操作。Spring Batch对DB数据的读取操作提供两种形式,一种是以游标为基础,一条条的读取数据;另外一种是分页的方式读取DB。
通过前面文章的讲解,大家应该对SpringBatch的框架和基本配置有了一定的了解。为了不显得啰嗦,本文只提供读取DB方式的核心配置,一些辅助信息的配置,如果不明白,可以参照前面几讲的内容。
首先是读操作:
1 2 class="rsorItemReader" 3 scope="step"> 4 5 6 value="select ID,USERID,USERNAME,PASSWORD from T_USER where id < ?" /> 7 8 9 10 value="" /> 11
12
13
14
15
16 17 class="eparedStatementSetter" 18 scope="step"> 19 20 21 22 23 24
复制代码
Spring Batch对DB基于游标的读取数据操作,是由其核心类JdbcCursorItemReader来实现的。一般来说,从DB数据表中读取数据一般有以下几个步骤。首先是DB连接,这些连接DB的基本信息(像DB服务器地址、用户名、密码等信息)由dataSource属性提供,SpringBatch没有提专门供特殊的类,用的是Spring框架的DataSource。连接上了DB,下面需要关注的就是对DB具体的查询操作了,也就是SQL文的相关信息了,由其sql属性实现,主要是拼接SQL文的一个字符串。有了sql文,有可能需要传递参数,sql文参数的一些信息,由preparedStatementSetter属性来满足,具体的实现可以由SpringBatch提供的核心类ListPreparedStatementSetter来设置,如代码的16-24行。连接上了DB,执行了SQL文,最后要关心的就是查询结果的存放问题了,这一点由JdbcCursorItemReader的rowMapper属性来实现,如代码的7-12行。这样就可以将DB数据表中的数据一条条映射成我们的Pojo对象了,也就完成了读操作。
其次是写操作:
1 2 class="tchItemWriter"> 3 4 5 value="insert into T_DESTUSER (ID,USERID,USERNAME,PASSWORD,UPDATETIME,UPDATEUSER) Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register! 6 values 7 (:id,:userId,:userName,:password,:updateDate,:updateUser)" /> 8 9 10 class="opertyItemSqlParameterSourceProvider" /> 11 12
复制代码
写DB和读DB思路是一样的,只不过一个是从DB里读,一个是往DB里写。是由SpringBatch框架的JdbcBatchItemWriter类实现的。也有以下几个步骤:首先是连接DB,也是由dataSource属性提供;其次是执行的SQL文,有sql属性满足,最后就是如何传递参数了。写操作传递参数的时候,跟读操作有一定区别。写操作提供两种传递参数的方式:一种是直接传递一个对象进去,如上述代码8-11行所示,itemSqlParameterSourceProvider属性设置为BeanPropertyItemSqlParameterSourceProvider的实现就可以了。SQL文中的参数也是使用【:对象属性名】的方式,如上述代码的7行所示,并且区分大小写。第二种传递参数的方式是:设置JdbcBatchItemWriter类的itemSqlParameterSourceProvider属性(设置方式与读操作的paramStatementSetter属性的设置方式大同小异),当然,sql也和第一种方式有区别,使用如下形式:insert into T_DESTUSER
(ID,USERID,USERNAME,PASSWORD,UPDATETIME,UPDATEUSER) values (?,?,?,?,?,?).两种方式各有优势,根据实际情况自行选择。
至此,SpringBatch读写DB的简单操作就介绍完了。下次将和大家一起讨论开发中遇到的一些高级特性和实际问题。
Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!
Spring Batch 之 skip讲解(九)
前面的文章跟大家一起讨论了Spring Batch的概念,处理流程,以及SpringBatch处理文件、DB的一些简单实例。接下来的讨论,主要是关于Spring Batch的一些高级应用处理和实际开发中需要注意的一些问题。
今天主要和大家讨论SpringBatch关于skip容错机制的一些处理。
一、skip的介绍
在实际的项目开发中,我们常常要将几十万甚至上百万的数据从文件导入到DB中,如果其中某条数据导入时发生例外,我们并不想整个Job以失败而结束,而是希望能将错误的数据经过处理后保存起来,其余正确的数据继续做导入处理。如果遇到这样的场景,SpringBatch的skip机制就可以派上用场了。顾名思义,skip的作用就是跳过某些数据(例如错误数据)。
二、配置skip信息
配置skip的示例代码如下:
1
2
3
4 5 commit-interval="1" skip-limit="1000"> 6 7 8 9
10
11
12
复制代码
代码第5行chunk的skip-limit属性是指允许跳过记录的行数,6-8行是指允许发生的例外,也就是说在发生FlatFileParseException(及其子类)的时候,job是不会被终止的,而是跳过当前的记录,去执行下面那条记录。 上面的代码也会有另外一个问题,就是发生FlatFileParseException以外例外的时候,Job也会失败。这也满足不了我们上面说的那种场景,当然,6-8行还有另外一种配置方式,如下:
1
2
3
4
复制代码
include是允许跳过的错,exclude是不允许跳过的错。如果像上诉代码那样配置的话,所有Exception及其子类(FileNotFoundException除外)发生时,Job都不会被终止;但是当FileNotFoundException发生时,虽然它也是Exception的子类,但Job会被终止,因为FileNotFoundException属于exclude属性的class。
三、skip深入讲解
是谁在决定当前的记录跳过与否呢?其实,当Reader、Processor和Writer抛出例外的时候,SpringBatch会调用skip机制,来判断当前例外发生时,正在被处理的记录是否被跳过。当在上面的代码中配置skippable-exception-classes属性的时候,SpringBatch会默认的调用LimitCheckingItemSkipPolicy类。如果简单的配置skip-limit和skippable-exception-classes不能满足需求时,也可以定义自己的skip策略。代码如下:
Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!
1 package ;
2
3 import mitExceededException;
4 import licy;
5
6 /**
7 * 自定义Skip策略类。
8 * @author Wanggc
9 */
10 public class MySkipPolicy implements SkipPolicy {
11
12 @Override
13 public boolean shouldSkip(Throwable t, int skipCount)
14 throws SkipLimitExceededException {
15 // TODO Auto-generated method stub
16 return false;
17 }
18 }
复制代码
如示例代码所示,要实现SkipPolicy接口,在shouldSkip方法中定义自己的skip策略。返回false时,说明当前例外不能被跳过,否则可以被跳过。当然,定义了自己的skip策略还不够,还要告诉框架要使用自己定义的skip策略,而不是框架默认的。这就需要添加chunk的另外一个属性skip-policy。代码如下:
1
2
3
4 5 commit-interval="1" skip-limit="1000" skip-policy="mySkipPolicy"> 6 7 8 class="leParseException" /> 9 10
11
12
13
14
复制代码
添加了skip-policy属性后,skip-limit和skippable-exception-classes默认策略将不再起作用。当然,可以将其删除,示例中属于垃圾代码。
当Reader、Processor和Writer抛出例外的时候,SpringBatch处理skip策略的方式是不同的。当Reader发生可以被skip的例外时,SpringBatch会接着去读下面一条记录,并不会回滚事务。当Processor发生可以被skip的例外时,SpringBatch会回滚当前chunk的事务,并将除了引发例外以外的数据传给Writer。当Writer发生可以被skip的例外的时,SpringBatch首先回滚事务,因为传给Writer的是一个list,所以Writer不知道是list中那条记录造成了例外的发生。Writer会将list拆开,一条条的处理,正确的数据提交,错误的数据回滚。
对SpringBatch的skip机制的讨论就到这里了,接下来会讨论其他一些高级属性。
Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!
Spring Batch 之 JobParameters (十)
继续前面关于Spring Batch系列的文章,本文主要介绍与JobParameters相关的一些知识。
一、JobParameters
顾名思义,所谓JobParameters,就是Job运行时的参数。它在bath中有两个作用:一是标示不同的jobInstance,二是作为job中用到的信息,以参数的形式传给job。
如何使用JobParameters呢?它主要是在启动的job的时候,与job联系起来的。看一下框架提供的启动job的接口JobLauncher的源代码,就会发现其run方法需要两个参数,一个是Job,也就是需要启动的job,另一个就是JobParameters。可以通过如下方式使用:
(job, new JobParametersBuilder()
.addString("para1", "value1")
.addString("para2","value2")
.toJobParameters()
);
如代码所示,参数para1和para2就可以传给Job了,在Job中如果需要使用参数信息,可以使用Spring注入的方式传给不同的使用对象。
注意需要设置Bean的scope属性为step,这是SpringBatch的一个后绑定技术,就是在生成Step的时候,才去创建bean,因为这个时候jobparameter才传过来。如果加载配置信息的时候就创建bean,这个时候jobparameter的值还没有产生,会抛出异常。
二、JobParametersValidator
SpringBatch框架支持JobParameters的简单验证,并提供了JobParametersValidator接口和validate方法,用于在job启动之前对参数信息验证和检查。如果需要对参数进行验证,就可以实现此接口,并在validate方法中定制验证规则,当验证失败的时候,会抛出JobParametersInvalidException异常。当然,SpringBatch框架也提供了一个默认的验证类DefaultJobParametersValidator,但此类验证功能有限,主要用于必须项和非必须项的验证。通过如下配置可以实现此验证功能。
......
class="tJobParametersValidator"> Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!
DefaultJobParametersValidator类提供两个重要属性:requiredKeys和optionalKeys,前者为JobParameters中必须包含的项,但对参数的实际值不做check(不做空的check),只要包含此项的key就可以了;后者为可选项,有没有此项都可以。但如果JobParameters中含有两者以外的项,也会抛出JobParametersInvalidException异常。
三、JobParametersIncrementer
JobParametersIncrementer主要用于JobOperator接口的startNextInstance等方法启动job的情况下。同一个Job在batch启动后被多次调用的场合,startNextInstance方法将会非常有用,因为它将使用JobParametersIncrementer与Job绑定,创建一个新实例。因为JobParametersIncrementer有一个getNext方法,可以在此方法中为parameters添加一个自增的值,以区分不同的Job实例,当然,这个值在job的其他的地方并不会用到,仅仅是为了标示不同JobInstance。当然SpringBatch框架也为我们提供了一个JobParametersIncrementer的实现类RunIdIncrementer 。使用方法如下:
......
class="ncrementer"/> RunIdIncrementer的getNext方法实现如下: public JobParameters getNext(JobParameters parameters) { if (parameters == null) { parameters = new JobParameters(); } long id = g(key, 0L) + 1; return new JobParametersBuilder(parameters).addLong(key, id).toJobParameters(); } 由代码可以看出,当parameters为null时,创建一个新的JobParameters,并添加一项“key”;不为null时,直接给原来的parameters添加一项。最后直接返回。因为key代表的value每次是都会在原来的基础上加1,这样就保证了每次创建的jobInstance是不同的了。 以上,就是JobParameters主要知识点的介绍,下次,将介绍SpringBatch其他的一些技术知识点。
版权声明:本文标题:Spring Batch系列文章汇总 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.freenas.com.cn/free/1705372335h483037.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论