admin 管理员组

文章数量: 887063


2024年1月16日发(作者:常见的ascii码表)

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

Spring Batch

作 者:孤旅者

Blog:/gulvzhe

整理者:大海

声 明:此系列博文出自博客园,版权归原作者和博客园所有,转载请注明出处。只能用于学习之用。任何版权问题整理者概不负责!

Spring Batch系列总括

最近一个项目在使用SpringBatch框架做一个电子商务平台的批处理。网上资料很有限,尤其是中文资料更是少之又少,官网上的文档也只是讲一些入门的基础知识,大部分高级特性都是一笔带过,讲解的很不彻底,在实际开发中碰到的问题很多。因此,特将自己学习、应用Spring Batch的过程总结成一个个小实例写成随笔。一是备忘,二是抛砖引玉,希望更多的高手能参与进来,指出其中的不足和提出自己的见解,大家共通讨论学习。

写过的关于SpringBatch的随笔主要有以下几篇:

Spring Batch 之 Spring Batch 简介(一)

Spring Batch 之 框架流程简单介绍(二)

Spring Batch 之 Sample(Hello World)(三)

Spring Batch 之 Sample(CSV文件操作)(四)

Spring Batch 之 Sample(XML文件操作)(五)

Spring Batch 之 Sample(固定长格式文件读写)(六)

Spring Batch 之 Sample(复合格式文件的读、多文件的写)(七)

Spring Batch 之 Sample(游标方式读写DB数据表)(八)

Spring Batch 之 skip讲解(九)

Spring Batch 之 JobParameters(十)

后续关于Spring Batch的随笔继续更新中

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

Spring Batch 之 Spring Batch 简介(一)

Spring Batch是一个轻量级的,完全面向Spring的批处理框架,可以应用于企业级大量的数据处理系统。Spring

Batch以POJO和大家熟知的Spring框架为基础,使开发者更容易的访问和利用企业级服务。Spring Batch可以提供大量的,可重复的数据处理功能,包括日志记录/跟踪,事务管理,作业处理统计工作重新启动、跳过,和资源管理等重要功能。

业务方案:

1、批处理定期提交。

2、并行批处理:并行处理工作。

3、企业消息驱动处理

4、大规模的并行处理

5、手动或是有计划的重启

6、局部处理:跳过记录(如:回滚)

技术目标:

1、利用Spring编程模型:使程序员专注于业务处理,让Spring框架管理流程。

2、明确分离批处理的执行环境和应用。

3、提供核心的,共通的接口。

4、提供开箱即用(out of the box)的简单的默认的核心执行接口。

5、提供Spring框架中配置、自定义、和扩展服务。

6、所有存在的核心服务可以很容的被替换和扩展,不影响基础层。

7、提供一个简单的部署模式,利用Maven构建独立的Jar文件。

Spring Batch的结构:

这种分层结构有三个重要的组成部分:应用层、核心层、基础架构层。应用层包含所有的批处理作业,通过Spring框架管理程序员自定义的代码。核心层包含了Batch启动和控制所需要的核心类,如:JobLauncher、Job和step等。应用层和核心层建立在基础构架层之上,基础构架层提供共通的读(ItemReader)、写(ItemWriter)、和服务(如RetryTemplate:重试模块。可以被应用层和核心层使用)。

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

Spring Batch 之 框架流程简单介绍(二)

Spring Batch流程介绍:

上图描绘了Spring Batch的执行过程。说明如下:

每个Batch都会包含一个Job。Job就像一个容器,这个容器里装了若干Step,Batch中实际干活的也就是这些Step,至于Step干什么活,无外乎读取数据,处理数据,然后将这些数据存储起来(ItemReader用来读取数据,ItemProcessor用来处理数据,ItemWriter用来写数据) 。JobLauncher用来启动Job,JobRepository是上述处理提供的一种持久化机制,它为JobLauncher,Job,和Step实例提供CRUD操作。

外部控制器调用JobLauncher启动一个Job,Job调用自己的Step去实现对数据的操作,Step处理完成后,再将处理结果一步步返回给上一层,这就是Batch处理实现的一个简单流程。

Step执行过程:

从DB或是文件中取出数据的时候,read()操作每次只读取一条记录,之后将读取的这条数据传递给processor(item)处理,框架将重复做这两步操作,直到读取记录的件数达到batch配置信息中”commin-interval”设定值的时候,就会调用一次write操作。然后再重复上图的处理,直到处理完所有的数据。当这个Step的工作完成以后,或是跳到其他Step,或是结束处理。

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

这就是一个SpringBatch的基本工作流程。

下次,将通过“Hello World”实例,与大家共同探讨SpringBatch的具体应用和实现。

Spring Batch 之 Sample(Hello World)(三)

通过前面两篇关于Spring Batch文章的介绍,大家应该已经对Spring Batch有个初步的概念了。这篇文章,将通过一个”Hello World!”实例,和大家一起探讨关于Spring Batch的一些基本配置和实现。使大家从开发的角度对Spring Batch有一个真切的体会。

说明:1,本实例使用的是spring-batch 2.1.8

2,本实例没有像前面讲的那样配置ItemReader、ItemProcessor和ItemWriter,而是之间在Step中调用Tasklet,由Tasklet完成”Hello World!”的输出。

工程结构如下图:

类用来启动Bath,用来完成输出工作。用来配置一些Spring信息,配置Job信息。

文件配置如下:

xmlns:xsi="/2001/XMLSchema-instance" xmlns:p="/schema/p"

xmlns:tx="/schema/tx" xmlns:aop="/schema/aop"

xmlns:context="/schema/context"

xsi:schemaLocation="/schema/beans

/schema/beans/

/schema/tx

/schema/tx/

/schema/aop

/schema/aop/

/schema/context

/schema/context/"

default-autowire="byName">

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

toryFactoryBean">

class="celessTransactionManager"/>

jobLauncher负责batch的启动工作,jobRepository负责job的整个运行过程中的CRUD操作,transactionManager负责事务的管理操作。

文件配置如下:

xmlns:bean="/schema/beans" xmlns:xsi="/2001/XMLSchema-instance"

xmlns:p="/schema/p" xmlns:tx="/schema/tx"

xmlns:aop="/schema/aop" xmlns:context="/schema/context"

xsi:schemaLocation="/schema/beans

/schema/beans/

/schema/tx

/schema/tx/

/schema/aop

/schema/aop/

/schema/context

/schema/context/

/schema/batch

/schema/batch/">

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

配置了一个ID为helloWorldJob的job,此job有两个Step : step_hello和step_world,前者负责输出“Hello ”,后者负责输出“World!”,当第一个Step完成以后,执行第二个Step。

writeTasklet类的代码如下:

public class writeTasklet implements Tasklet {

/** Message */

private String message;

/**

* @param message

* the message to set

*/

public void setMessage(String message) {

e = message;

}

@Override

public RepeatStatus execute(StepContribution arg0, ChunkContext arg1)

throws Exception {

n(message);

return ED;

}

}

此类中定义了一个message属性,通过的“hello”和“world” Bean为其注入值。 execute方法,是由Tasklet接口继承而来的,是Tasklet实现业务逻辑的地方。此实例中只是简单的输出Message信息后,直接返回。

启动类JobLaunch类的代码如下:

public class JobLaunch {

/**

* @param args

*/

public static void main(String[] args) {

ApplicationContext context = new ClassPathXmlApplicationContext(

"");

JobLauncher launcher = (JobLauncher) n("jobLauncher");

Job job = (Job) n("helloWorldJob");

try {

/* 运行Job */

JobExecution result = (job, new JobParameters());

/* 处理结束,控制台打印处理结果 */

n(ng());

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

} catch (Exception e) {

tackTrace();

}

}

}

本例通过Spring配置的方式取得JobLauncher和Job对象,然后由JobLauncher的run方法启动job,参数JobParameters是标志job的一些参数,处理结束后,控制台输出处理结果。

上面就是通过SpringBatch运行一个"Hello World”程序所需要的基本配置。由于其优势是处理大批量的数据,所以仅仅为了输出"Hello World"而编写这么多代码和配置文件,确实显得有些笨拙,也体现不出其优越性。

下次,将通过读取一个CSV文件,经过简单的处理,再写入另外一个CSV文件的实例,与大家共同探讨SpringBatch的应用。

Spring Batch 之 Sample(CSV文件操作)(四)

本文将通过一个完整的实例,与大家一起讨论运用Spring Batch对CSV文件的读写操作。此实例的流程是:读取一个含有四个字段的CSV文件(ID,Name,Age,Score),对读取的字段做简单的处理,然后输出到另外一个CSV文件中。

工程结构如下图:

JobLaunch类用来启动Job, CsvItemProcessor类用来对Reader取得的数据进行处理, Student类是一个POJO类,用来存放映射的数据。 是数据读取文件, 是数据输出文件。

文件配置如前篇文章,不再赘述。

文件中Job配置如下:

这个文件里配置了这次运行的JOB:csvJob。本Job包含一个Step,完成一个完整的CSV文件读写功能。分别由 csvItemReader完成CSV文件的读操作,由 csvItemProcessor完成对取得数据的处理,由

csvItemWriter完成对CSV文件的写操作。

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

文件中csvItemReader配置如下:

class="leItemReader" scope="step">

class="tLineMapper">

class="apperFieldSetMapper">

ID

name

age

score

csvItemReader实现的是Spring Batch提供FlatFileItemReader类,此类主要用于Flat文件的读操作。它包含两个必要的属性 resource和 lineMapper。前者指定要读取的文件的位置,后者是将文件的每一行映射成一个Pojo对象。其中 lineMapper也有两个重要属性 lineTokenizer和 fieldSetMapper, lineTokenizer将文件的一行分解成一个 FieldSet,然后由 fieldSetMapper映射成Pojo对象。

这种方式与DB的读操作非常类似。lineMapper类似于ResultSet,文件中的一行类似于Table中的一条记录,被封装成的FieldSet,类似于RowMapper。至于怎么将一条记录封装,这个工作由lineTokenizer的继承类DelimitedLineTokenizer完成。DelimitedLineTokenizer的delimiter属性决定文件的一行数据按照什么分解,默认的是“,”, names属性标示分解的每个字段的名字,传给fieldSetMapper(本实例用的是BeanWrapperFieldSetMapper)的时候,就可以按照这个名字取得相应的值。fieldSetMapper的属性prototypeBeanName,是映射Pojo类的名字。设置了此属性后,框架就会将lineTokenizer分解成的一个

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

FieldSet映射成Pojo对象,映射是按照名字来完成的(lineTokenizer分解时标注的名字与Pojo对象中字段的名字对应)。

总之,FlatFileItemReader读取一条记录由以下四步完成:1,从resource指定的文件中读取一条记录;2,lineTokenizer将这条记录按照delimiter分解成Fileset,每个字段的名字由names属性取得;3,将分解成的Fileset传递给fieldSetMapper,由其按照名字映射成Pojo对象;4,最终由FlatFileItemReader将映射成的Pojo对象返回,框架将返回的对象传递给Processor。

csvItemProcessor实现的是ItemProcessor类。此类接受Reader映射成的Pojo对象,可以对此对象做相应的业务逻辑处理,然后返回,框架就会将返回的结果传递给Writer进行写操作。具体实现代码如下:

package ;

import ocessor;

import ent;

/**

* ItemProcessor类。

*/

@Component("csvItemProcessor")

public class CsvItemProcessor implements ItemProcessor {

/**

* 对取到的数据进行简单的处理。

*

* @param student

* 处理前的数据。

* @return 处理后的数据。

* @exception Exception

* 处理是发生的任何异常。

*/

@Override

public Student process(Student student) throws Exception {

/* 合并ID和名字 */

e(() + "--" + e());

/* 年龄加2 */

(() + 2);

/* 分数加10 */

re(re() + 10);

/* 将处理后的结果传递给writer */

return student;

}

}

文件中csvItemWriter配置如下:

class="leItemWriter" scope="step">

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

class="tedLineAggregator">

class="apperFieldExtractor">

csvItemWriter实现的是FlatFileItemWriter类。此类与FlatFileItemReader类相似,也有两个重要的属性:resource和lineAggregator。前者是要输出的文件的路径,后者和lineTokenizer类似。lineAggregator(本实例用DelimitedLineAggregator类)也有两个重要的属性:delimiter和fieldExtractor。Delimiter标示输出的字段以什么分割,后者将Pojo对象组装成由Pojo对象的字段组成的一个字符串。同样FlatFileItemWriter写一条记录也有以下四步完成:1,Processor传递过来一个对象给lineAggregator;2,lineAggregator将其这个对象转化成一个数组;3,再由lineAggregator的属性fieldExtractor将数组转化成按照delimiter分割一个字符串;4,将这个字符串输出。

这样,一条数据的读、处理、写操作就基本完成了。当然,读和写也可以自己写类来处理,只是要注意继承FlatFileItemReader和FlatFileItemWriter就可以了。

实例中用到的Student类代码如下:

package ;

/** Pojo类_Student */

public class Student {

/** ID */

private String ID = "";

/** 名字 */

private String name = "";

/** 年龄 */

private int age = 0;

/** 分数 */

private float score = 0;

/*getter 和setter已删除*/

}

实例中用到的输入数据如下:

实例输出结果如下:

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

本文的配置要注意以下两点:

1, 注意Writer的resource要写成“file:******”形式,不能用“classpath:******”形式。

2, 如果将Job配置中commit-interval属性配置为大于1时,每次commit的都是最后一条记录,前面读取的被覆盖了。具体原因不明,如果将Reader的fieldSetMapper属性自己重写,就可以解决这个问题。(注:student bean添加scope属性可以解决此问题:scope:"prototype".2011/12/16)

下次,将和大家一起讨论关于XML文件的读写问题。

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

Spring Batch 之 Sample(XML文件操作)(五)

前篇关于Spring Batch的文章,讲述了Spring Batch 对CSV文件的读写操作。 本文将通过一个完整的实例,与大家一起讨论运用Spring Batch对XML文件的读写操作。实例流程是从一个XML文件中读取商品信息,经过简单的处理,写入另外一个XML文件中。

工程结构如下图:

是log处理的配置文件,与本文没有必然联系,再此不做论述。

文件内容如下:

1

2

3 xmlns:xsi="/2001/XMLSchema-instance" xmlns:p="/schema/p"

4 xmlns:tx="/schema/tx" xmlns:aop="/schema/aop"

5 xmlns:context="/schema/context"

6 xsi:schemaLocation="/schema/beans

7 /schema/beans/

8 /schema/tx

9 /schema/tx/

10 /schema/aop

11 /schema/aop/

12 /schema/context

13 /schema/context/"

14 default-autowire="byName">

15

16

17

18

19

20 class="JobLauncher">

21

22

23

24

25

26

27 class="celessTransactionManager">

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

28

29

复制代码

17行是base-spckage的指定,是spring的用法。

19-22行配置的jobLauncher用来启动Job。

24行配置的jobRepository为job提供持久化操作。

26-28行的transactionManager提供事物管理操作。

本文核心配置文件内容如下:

1

2

3 xmlns:bean="/schema/beans" xmlns:xsi="/2001/XMLSchema-instance"

4 xmlns:p="/schema/p" xmlns:tx="/schema/tx"

5 xmlns:aop="/schema/aop" xmlns:context="/schema/context"

6 xmlns:util="/schema/util"

7 xsi:schemaLocation="/schema/beans

8 /schema/beans/

9 /schema/tx

10 /schema/tx/

11 /schema/aop

12 /schema/aop/

13 /schema/context

14 /schema/context/

15 /schema/batch

16 /schema/batch/

17 /schema/util /schema/util/">

18

19

20

21

22

23

24

25 commit-interval="10">

26

27

28

29

30

31

32

33 class="entItemReader" scope="step">

34

35

36

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

37 value="file:#{jobParameters['inputFilePath']}">

38

39

40

41

42 class="entItemWriter" scope="step">

43

44

45

46 value="file:#{jobParameters['outputFilePath']}" />

47

48

49

50 class="mMarshaller">

51

52

53

54 value="" />

55

56

57

58

59

复制代码

21-29行配置了Job的基本信息。此Job包含一个Step,Step中包含了基本的读(xmlReader),处理(xmlProcessor),写(xmlWriter)。

32-38行配置了对XML文件读操作。对XML的读是由SpringBatch提供的StaxEventItemReader类来完成。要读取一个XML文件,首先要知道这个文件的存放路径,resource属性就是指定文件路径信息的。知道了文件路径,还需要知道要读取的XML的根节点名称,fragmentRootElementName属性就是指定根节点名称的。知道了根节点名称,还需要知道的一点就是怎么解析这个节点信息,unmarshaller就负责完成解析节点信息,并映射成程序pojo对象。注意,根节点并不是指整个XML文件的根节点,而是指要读取的信息片段的根节点,不管这个节点片段处在哪一层,框架都会遍历到。

49-58行配置了解析XML节点信息的unmarshaller。其中entry的key指定对应根节点名称goods,value指定程序的pojo类,这样,程序就可以将goods节点下的子节点与pojo类(Goods)中的属性去匹配,当匹配到子节点名与pojo类中的属性名相同时,就会将子节点的内容赋值给pojo类的属性。这样就完成了一个根节点的读取,框架会控制循环操作,直到将文件中所有根(goods)节点全部读完为止。这样就完成了XML文件的读操作。

41-47行配置了对XML文件的写操作。与读XML文件一样,要写一个XML文件,也是需要知道这个文件的文件的存放路径的,同样是resource属性提供文件的路径信息。同时,也是需要知道这个文件的跟节点信息的,rootTagName属性提供根节点名信息。注意此处的根节点,指整个文件的跟节点,与读得时候稍有区别,从两个属性的名称上也可以看出。有了上面的信息,完成一个写操作,还需要一个把pojo对象转换成XML片段的工具,由marshaller提供。本文读操作的unmarshaller和写操作的marshaller用的是同一个转换器,因为XStreamMarshaller既提供将节点片段转换为pojo对象功能,同时又提供将pojo对象持久化为xml文件的功能。如果写的内容与读得内容有很大差异,可以另外配置一个转换器。

文件配置的对XML文件的读写操作,至于读出来的信息做怎么样的处理再写入文件,通过简单的配置恐怕就无法完成了,就需要我们自己写代码完成了。XMLProcessor类就是完成这个工作的。只要在Job的配置文件中指定到这个类就可以了。XMLProcessor类的内容如下:

package ;

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

import ;

import ocessor;

import ent;

import ;

/**

* XML文件处理类。

*/

@Component("xmlProcessor")

public class XMLProcessor implements ItemProcessor {

/**

* XML文件内容处理。

*

*/

@Override

public Goods process(Goods goods) throws Exception {

// 购入日期变更

Day(new Date());

// 顾客信息变更

tomer(tomer() + "顾客!");

// ISIN变更

n(n() + "IsIn");

// 价格变更

ce(ce() + 1000.112);

// 数量变更

ntity(ntity() + 100);

// 处理后的数据返回

return goods;

}

}

内容很简单,再此就不做赘述了。要注意的一点就是红背景色的地方。加了此标签无须在文件增加对xmlProcessor声明用的bean,可以在job中直接引用,这是Spring的功能。当然,实现这个的前提是要在中配置base-package,只有这样才能找到。

工程结构图中的XMLLaunch类用来启动Job。内容如下:

package ;

import ;

import cution;

import ametersBuilder;

import ncher;

import ationContext;

import athXmlApplicationContext;

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

public class XMLLaunch {

/**

* @param args

*/

public static void main(String[] args1) {

ApplicationContext context = new ClassPathXmlApplicationContext(

"");

JobLauncher launcher = (JobLauncher) n("jobLauncher");

Job job = (Job) n("xmlFileReadAndWriterJob");

try {

// JOB实行

JobExecution result = (job, new JobParametersBuilder()

.addString("inputFilePath", "C:")

.addString("outputFilePath", "C:")

.toJobParameters());

// 运行结果输出

n(ng());

} catch (Exception e) {

tackTrace();

}

}

}

注意其中为Job提供的两个动态参数,以及在配置文件中的用法。

pojo类Goods的内容如下:

package ;

import rmat;

import DateFormat;

import ;

/**

* 商品信息类.

*/

public class Goods {

/** isin号 */

private String isin;

/** 数量 */

private int quantity;

/** 价格 */

private double price;

/** 客户 */

private String customer;

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

/** 购入日期 */

private Date buyDay;

/* getter he setter已经删除 */

}

文件内容如下:

处理结果如下():

下次,将和大家一起讨论关于Spring Batch 对固定长内容文件的读写问题。

Spring Batch 之 Sample(固定长格式文件读写)(六)

前篇关于Spring Batch的文章,讲述了Spring Batch 对XML文件的读写操作。 本文将通过一个完整的实例,与大家一起讨论运用Spring Batch对固定长格式文件的读写操作。实例延续前面的例子,读取一个含有四个字段的TXT文件(ID,Name,Age,Score),对读取的字段做简单的处理,然后输出到另外一个TXT文件中。

工程结构如下图:

和前文已经叙述过,在此不做赘述。

本文核心配置文件内容如下:

1

2

3 xmlns:bean="/schema/beans" xmlns:xsi="/2001/XMLSchema-in

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

stance"

4 xmlns:p="/schema/p" xmlns:tx="/schema/tx"

5 xmlns:aop="/schema/aop" xmlns:context="/schema/context"

6 xmlns:util="/schema/util"

7 xsi:schemaLocation="/schema/beans

8 /schema/beans/

9 /schema/tx

10 /schema/tx/

11 /schema/aop

12 /schema/aop/

13 /schema/context

14 /schema/context/

15 /schema/batch

16 /schema/batch/

17 /schema/util /schema/util/">

18

19

20

21

22

23

24

25

26 processor="fixedLengthProcessor" commit-interval="10">

27

28

29

30

31

32

33

34 class="leItemReader" scope="step">

35

36 value="file:#{jobParameters['inputFilePath']}" />

37

38

39 class="tLineMapper">

40

41

42

43 class="apperFieldSetMapper">

44

45

46

47

48

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

49

50

51 class="tPojo" scope="prototype" />

52

53 class="engthTokenizer">

54

55

56

57

58

59

60 class="leItemWriter" scope="step">

61

62 value="file:#{jobParameters['outputFilePath']}" />

63

64

65 class="terLineAggregator">

66

67

68 class="apperFieldExtractor">

69

70

71

72

73

74

75

76

复制代码

22-30行配置了Job的基本信息。此Job包含一个Step,Step中包含了基本的读(fixedLengthReader),处理(fixedLengthProcessor),写(fixedLengthWriter)以及commit件数(commit-interval)。

33-49行配置了读处理的详细信息。固定长格式和csv格式都属于flat文件格式,所以读取固定长格式文件也是需要使用Spring Batch提供的核心类FlatFileItemReader。对此类的配置在《Spring Batch 之 Sample(CSV文件操作)(四) 》中已经做过详细说明。但要注意lineTokenizer的配置,在读取CSV文件的时候,使用的是DelimitedLineTokenizer类,但是读取固定长格式的文件,需要使用FixedLengthTokenizer,如52-56行所示。其columns是如何分割一条记录信息,也就是说指定哪几列属于一个项目的信息(注意:列数的总长度与文件记录长度不一样的时候,会报错。注意限定范围)。属性names指定每个项目的名字。其名字与44行prototypeBeanName属性指定的Pojo属性名相同。

59-76行配置了写处理的详细信息。写固定长格式的文件,与写CSV格式的文件一样,也是使用Spring Batch提供的核心类FlatFileItemWriter。在此也不再赘述。但要注意lineAggregator属性使用的是FormatterLineAggregator类,此类的format属性可以指定每个项目所占的长度和格式。

文件配置了对固定长文件的和写。在读之后,写之前的处理,是通过自定的FixedLengthProcessor 类处理的。详细代码如下:

package ength;

import ocessor;

import ent;

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

/**

* 业务处理类。

*

* @author Wanggc

*/

@Component("fixedLengthProcessor")

public class FixedLengthProcessor implements

ItemProcessor {

/**

* 对取到的数据进行简单的处理。

*

* @param student

* 处理前的数据。

* @return 处理后的数据。

* @exception Exception

* 处理是发生的任何异常。

*/

public StudentPojo process(StudentPojo student) throws Exception {

/* 合并ID和名字 */

e(() + "--" + e());

/* 年龄加2 */

(() + 2);

/* 分数加10 */

re(re() + 10);

/* 将处理后的结果传递给writer */

return student;

}

}

至此,对固定长格式文件的读、处理、写操作已经介绍完毕。下面是一些辅助文件的信息。

Pojo类StudentPojo的详细代码如下:

package ength;

/** Pojo类_Student */

public class StudentPojo {

/** ID */

private String ID = "";

/** 名字 */

private String name = "";

/** 年龄 */

private int age = 0;

/** 分数 */

private float score = 0;

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

/* 为节省篇幅,getter 和 setter 已经删除 */

}

Job启动类Launch的详细代码如下:

package ength;

import ;

import cution;

import ametersBuilder;

import ncher;

import ationContext;

import athXmlApplicationContext;

public class Launch {

public static void main(String[] args) {

ApplicationContext context = new ClassPathXmlApplicationContext(

"");

JobLauncher launcher = (JobLauncher) n("jobLauncher");

Job job = (Job) n("fixedLengthJob");

try {

// JOB实行

JobExecution result = (

job,

new JobParametersBuilder()

.addString("inputFilePath",

"C:")

.addString("outputFilePath",

"C:")

.toJobParameters());

// 运行结果输出

n(ng());

} catch (Exception e) {

tackTrace();

}

}

}

input文件内容如下:

处理结果如下:

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

下次,将和大家一起讨论关于Spring Batch 对复合格式文件的读写问题。

Spring Batch 之 Sample(复合格式文件的读、多文件的写)(七)

前面关于Spring Batch的文章,讲述了SpringBatch对CSV文件的读写操作、对XML文件的操作,以及对固定长格式文件的操作。这些事例,同一个Reader读取的都是相同格式的数据,最终写入一个文件。如果遇到下面这样的数据,并想将学生信息和商品信息分类后写入两个文件,应该如何处理呢?

student,200001,ZhangSan,18,78

goodsPNH.1zhangshana2011/12/18 01:12:36

student,200002,LiSi,19,79

goodsPNH.1zhangshanb2011/12/19 01:12:36

student,200003,WangWu,20,80

goodsPNH.1zhangshanc2011/12/20 01:12:36

* 以student开头的数据代表学生信息,以goods开头代表商品信息

这次将和大家一起探讨Spring Batch读取复合格式的数据,然后写入不同的文件的处理方式。

工程结构如下图:

和前文已经叙述过,在此不做赘述。

本实例的核心配置文件内容如下:

1

2

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

3 xmlns:bean="/schema/beans" xmlns:xsi="/2001/XMLSchema-instance"

4 xmlns:p="/schema/p" xmlns:tx="/schema/tx"

5 xmlns:aop="/schema/aop" xmlns:context="/schema/context"

6 xmlns:util="/schema/util"

7 xsi:schemaLocation="/schema/beans

8 /schema/beans/

9 /schema/tx

10 /schema/tx/

11 /schema/aop

12 /schema/aop/

13 /schema/context

14 /schema/context/

15 /schema/batch

16 /schema/batch/

17 /schema/util /schema/util/">

18

19

20

21

22

23

24

25 commit-interval="1">

26

27

28

29

30

31

32

33

34

35

36

37 class="leItemReader" scope="step">

38

39 value="file:#{jobParameters['inputFilePath']}" />

40

41

42 class="nMatchingCompositeLineMapper">

43

44

45

46

47

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

48

49

50

51

52

53

54

55

56

57

58

59 class="tedLineTokenizer">

60

61

62

63 student

64 ID

65 name

66 age

67 score

68

69

70

71

72 class="apperFieldSetMapper">

73

74

75

76

77

78 class="t"

79 scope="prototype" />

80

81

82 class="engthTokenizer">

83

84

85 value="isin,quantity,price,customer,buyDay" />

86

87

88 class="apperFieldSetMapper">

89

90

91

92

93 class=""

94 scope="prototype" />

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

95

96

97 class="temWriter">

98

99

100

101

102

103

104

105

106

107 class="leItemWriter" scope="step">

108

109 value="file:#{jobParameters['outputFilePathStudent']}" />

110

111

112 class="terLineAggregator">

113

114

115 class="apperFieldExtractor">

116

117

118

119

120

121

122

123

124

125 class="leItemWriter" scope="step">

126

127 value="file:#{jobParameters['outputFilePathGoods']}" />

128

129

130 class="tedLineAggregator">

131

132

133 class="apperFieldExtractor">

134

135 value="isin,quantity,price,customer,buyDay" />

136

137

138

139

140

141

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

复制代码

21-33行配置了Job的基本信息。

36-57行配置了Reader的基本信息。FlatFileItemReader的lineMapper属性使用SpringBatch核心类PatternMatchingCompositeLineMapper的时候,会将读取的记录按照不同的方式映射成我们的Pojo对象。当然首先我们要配置不同的tokenizers(43-48)和fieldSetMappers(49-54),并告诉它当前的记录按照那条原则去解析和映射。如45行所示,我们指定key为student*的时候,用studentTokenizer去解析成fieldset,用studentFieldSetMapper将studentTokenizer解析好的fieldset记录映射成Student对象。我们指定的key,其实也就是student开头的记录,*是通配符。PatternMatchingCompositeLineMapper支持两种通配符:*和?,前者代表多个字符,后者仅代表一个字符。至于student和goods信息如何映射成pojo对象,前面的文章中已经做过详细的介绍,这里就不做赘述了。

96-104行配置了Writer的基本信息。Writer也是使用代理的方式,学生信息使用106-122行定义的studentWriter按照固定长的格式写入学生信息文件中,商品信息使用124-141行定义的goodsWriter按照CSV的格式写入商品信息文件中。MultiItemWriter的代码很简单,就不做详细解释了。如下:

package ypessinglefile;

import ist;

import ;

import iter;

import ;

import t;

/**

* 写处理类。

*

* @author Wanggc

*

* @param

*/

@SuppressWarnings("unchecked")

public class MultiItemWriter implements ItemWriter {

/** 写代理 */

private List> delegates;

public void setDelegates(List> delegates) {

tes = delegates;

}

@Override

public void write(List items) throws Exception {

// 学生信息的Writer

ItemWriter studentWriter = (ItemWriter) (0);

// 商品信息的Writer

ItemWriter goodsWriter = (ItemWriter) (1);

// 学生信息

List studentList = new ArrayList();

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

// 商品信息

List goodsList = new ArrayList();

// 将传过来的信息按照不同的类型添加到不同的List中

for (int i = 0; i < (); i++) {

if ("Student".equals((i).getClass().getSimpleName())) {

((Student) (i));

} else {

((Goods) (i));

}

}

// 如果学生List中有数据,就执行学生信息的写

if (() > 0) {

(studentList);

}

// 如果商品List中有数据,就执行商品信息的写

if (() > 0) {

(goodsList);

}

}

}

至此,复合文件的读写操作已经讨论结束了。注意实例没有配置Processor。下面是一些辅助文件的信息。

student和goods类的信息与前面文章一样,就不再贴出代码了。

Job启动的代码如下:

package ypessinglefile;

import ;

import cution;

import ametersBuilder;

import ncher;

import ationContext;

import athXmlApplicationContext;

public class Launch {

public static void main(String[] args) {

ApplicationContext context = new ClassPathXmlApplicationContext(

"");

JobLauncher launcher = (JobLauncher) n("jobLauncher");

Job job = (Job) n("multiTypeSingleFileJob");

try {

// JOB实行

JobExecution result = (

job,

new JobParametersBuilder()

.addString("inputFilePath",

"C:")

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

.addString("outputFilePathStudent",

"C:")

.addString("outputFilePathGoods",

"C:")

.toJobParameters());

// 运行结果输出

n(ng());

} catch (Exception e) {

tackTrace();

}

}

}

Input文件内容如下图:

处理结果的学生信息文件如下图:

处理结果的商品信息文件如下图:

Spring Batch对复合格式文件的读写操作就讨论到这里。至此,Spring Batch对文件简单操作的讨论也告一段落,下次将讨论Spring Batch读写DB的操作。

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

Spring Batch 之 Sample(游标方式读写DB数据表)(八)

前面关于Spring Batch的文章,讲述了SpringBatch对Flat、XML等文件的读写操作,本文将和大家一起讨论Spring

Batch对DB的读写操作。Spring Batch对DB数据的读取操作提供两种形式,一种是以游标为基础,一条条的读取数据;另外一种是分页的方式读取DB。

通过前面文章的讲解,大家应该对SpringBatch的框架和基本配置有了一定的了解。为了不显得啰嗦,本文只提供读取DB方式的核心配置,一些辅助信息的配置,如果不明白,可以参照前面几讲的内容。

首先是读操作:

1

2 class="rsorItemReader"

3 scope="step">

4

5

6 value="select ID,USERID,USERNAME,PASSWORD from T_USER where id < ?" />

7

8

9

10 value="" />

11

12

13

14

15

16

17 class="eparedStatementSetter"

18 scope="step">

19

20

21 #{jobParameters['id']}

22

23

24

复制代码

Spring Batch对DB基于游标的读取数据操作,是由其核心类JdbcCursorItemReader来实现的。一般来说,从DB数据表中读取数据一般有以下几个步骤。首先是DB连接,这些连接DB的基本信息(像DB服务器地址、用户名、密码等信息)由dataSource属性提供,SpringBatch没有提专门供特殊的类,用的是Spring框架的DataSource。连接上了DB,下面需要关注的就是对DB具体的查询操作了,也就是SQL文的相关信息了,由其sql属性实现,主要是拼接SQL文的一个字符串。有了sql文,有可能需要传递参数,sql文参数的一些信息,由preparedStatementSetter属性来满足,具体的实现可以由SpringBatch提供的核心类ListPreparedStatementSetter来设置,如代码的16-24行。连接上了DB,执行了SQL文,最后要关心的就是查询结果的存放问题了,这一点由JdbcCursorItemReader的rowMapper属性来实现,如代码的7-12行。这样就可以将DB数据表中的数据一条条映射成我们的Pojo对象了,也就完成了读操作。

其次是写操作:

1

2 class="tchItemWriter">

3

4

5 value="insert into T_DESTUSER (ID,USERID,USERNAME,PASSWORD,UPDATETIME,UPDATEUSER)

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

6 values

7 (:id,:userId,:userName,:password,:updateDate,:updateUser)" />

8

9

10 class="opertyItemSqlParameterSourceProvider" />

11

12

复制代码

写DB和读DB思路是一样的,只不过一个是从DB里读,一个是往DB里写。是由SpringBatch框架的JdbcBatchItemWriter类实现的。也有以下几个步骤:首先是连接DB,也是由dataSource属性提供;其次是执行的SQL文,有sql属性满足,最后就是如何传递参数了。写操作传递参数的时候,跟读操作有一定区别。写操作提供两种传递参数的方式:一种是直接传递一个对象进去,如上述代码8-11行所示,itemSqlParameterSourceProvider属性设置为BeanPropertyItemSqlParameterSourceProvider的实现就可以了。SQL文中的参数也是使用【:对象属性名】的方式,如上述代码的7行所示,并且区分大小写。第二种传递参数的方式是:设置JdbcBatchItemWriter类的itemSqlParameterSourceProvider属性(设置方式与读操作的paramStatementSetter属性的设置方式大同小异),当然,sql也和第一种方式有区别,使用如下形式:insert into T_DESTUSER

(ID,USERID,USERNAME,PASSWORD,UPDATETIME,UPDATEUSER) values (?,?,?,?,?,?).两种方式各有优势,根据实际情况自行选择。

至此,SpringBatch读写DB的简单操作就介绍完了。下次将和大家一起讨论开发中遇到的一些高级特性和实际问题。

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

Spring Batch 之 skip讲解(九)

前面的文章跟大家一起讨论了Spring Batch的概念,处理流程,以及SpringBatch处理文件、DB的一些简单实例。接下来的讨论,主要是关于Spring Batch的一些高级应用处理和实际开发中需要注意的一些问题。

今天主要和大家讨论SpringBatch关于skip容错机制的一些处理。

一、skip的介绍

在实际的项目开发中,我们常常要将几十万甚至上百万的数据从文件导入到DB中,如果其中某条数据导入时发生例外,我们并不想整个Job以失败而结束,而是希望能将错误的数据经过处理后保存起来,其余正确的数据继续做导入处理。如果遇到这样的场景,SpringBatch的skip机制就可以派上用场了。顾名思义,skip的作用就是跳过某些数据(例如错误数据)。

二、配置skip信息

配置skip的示例代码如下:

1

2

3

4

5 commit-interval="1" skip-limit="1000">

6

7

8

9

10

11

12

复制代码

代码第5行chunk的skip-limit属性是指允许跳过记录的行数,6-8行是指允许发生的例外,也就是说在发生FlatFileParseException(及其子类)的时候,job是不会被终止的,而是跳过当前的记录,去执行下面那条记录。 上面的代码也会有另外一个问题,就是发生FlatFileParseException以外例外的时候,Job也会失败。这也满足不了我们上面说的那种场景,当然,6-8行还有另外一种配置方式,如下:

1

2

3

4

复制代码

include是允许跳过的错,exclude是不允许跳过的错。如果像上诉代码那样配置的话,所有Exception及其子类(FileNotFoundException除外)发生时,Job都不会被终止;但是当FileNotFoundException发生时,虽然它也是Exception的子类,但Job会被终止,因为FileNotFoundException属于exclude属性的class。

三、skip深入讲解

是谁在决定当前的记录跳过与否呢?其实,当Reader、Processor和Writer抛出例外的时候,SpringBatch会调用skip机制,来判断当前例外发生时,正在被处理的记录是否被跳过。当在上面的代码中配置skippable-exception-classes属性的时候,SpringBatch会默认的调用LimitCheckingItemSkipPolicy类。如果简单的配置skip-limit和skippable-exception-classes不能满足需求时,也可以定义自己的skip策略。代码如下:

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

1 package ;

2

3 import mitExceededException;

4 import licy;

5

6 /**

7 * 自定义Skip策略类。

8 * @author Wanggc

9 */

10 public class MySkipPolicy implements SkipPolicy {

11

12 @Override

13 public boolean shouldSkip(Throwable t, int skipCount)

14 throws SkipLimitExceededException {

15 // TODO Auto-generated method stub

16 return false;

17 }

18 }

复制代码

如示例代码所示,要实现SkipPolicy接口,在shouldSkip方法中定义自己的skip策略。返回false时,说明当前例外不能被跳过,否则可以被跳过。当然,定义了自己的skip策略还不够,还要告诉框架要使用自己定义的skip策略,而不是框架默认的。这就需要添加chunk的另外一个属性skip-policy。代码如下:

1

2

3

4

5 commit-interval="1" skip-limit="1000" skip-policy="mySkipPolicy">

6

7

8 class="leParseException" />

9

10

11

12

13

14

复制代码

添加了skip-policy属性后,skip-limit和skippable-exception-classes默认策略将不再起作用。当然,可以将其删除,示例中属于垃圾代码。

当Reader、Processor和Writer抛出例外的时候,SpringBatch处理skip策略的方式是不同的。当Reader发生可以被skip的例外时,SpringBatch会接着去读下面一条记录,并不会回滚事务。当Processor发生可以被skip的例外时,SpringBatch会回滚当前chunk的事务,并将除了引发例外以外的数据传给Writer。当Writer发生可以被skip的例外的时,SpringBatch首先回滚事务,因为传给Writer的是一个list,所以Writer不知道是list中那条记录造成了例外的发生。Writer会将list拆开,一条条的处理,正确的数据提交,错误的数据回滚。

对SpringBatch的skip机制的讨论就到这里了,接下来会讨论其他一些高级属性。

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

Spring Batch 之 JobParameters (十)

继续前面关于Spring Batch系列的文章,本文主要介绍与JobParameters相关的一些知识。

一、JobParameters

顾名思义,所谓JobParameters,就是Job运行时的参数。它在bath中有两个作用:一是标示不同的jobInstance,二是作为job中用到的信息,以参数的形式传给job。

如何使用JobParameters呢?它主要是在启动的job的时候,与job联系起来的。看一下框架提供的启动job的接口JobLauncher的源代码,就会发现其run方法需要两个参数,一个是Job,也就是需要启动的job,另一个就是JobParameters。可以通过如下方式使用:

(job, new JobParametersBuilder()

.addString("para1", "value1")

.addString("para2","value2")

.toJobParameters()

);

如代码所示,参数para1和para2就可以传给Job了,在Job中如果需要使用参数信息,可以使用Spring注入的方式传给不同的使用对象。

注意需要设置Bean的scope属性为step,这是SpringBatch的一个后绑定技术,就是在生成Step的时候,才去创建bean,因为这个时候jobparameter才传过来。如果加载配置信息的时候就创建bean,这个时候jobparameter的值还没有产生,会抛出异常。

二、JobParametersValidator

SpringBatch框架支持JobParameters的简单验证,并提供了JobParametersValidator接口和validate方法,用于在job启动之前对参数信息验证和检查。如果需要对参数进行验证,就可以实现此接口,并在validate方法中定制验证规则,当验证失败的时候,会抛出JobParametersInvalidException异常。当然,SpringBatch框架也提供了一个默认的验证类DefaultJobParametersValidator,但此类验证功能有限,主要用于必须项和非必须项的验证。通过如下配置可以实现此验证功能。

......

class="tJobParametersValidator">

para1

para2

Generated by Unregistered Batch DOC TO PDF Converter 2010.2.301.1358, please register!

para3

para4

DefaultJobParametersValidator类提供两个重要属性:requiredKeys和optionalKeys,前者为JobParameters中必须包含的项,但对参数的实际值不做check(不做空的check),只要包含此项的key就可以了;后者为可选项,有没有此项都可以。但如果JobParameters中含有两者以外的项,也会抛出JobParametersInvalidException异常。

三、JobParametersIncrementer

JobParametersIncrementer主要用于JobOperator接口的startNextInstance等方法启动job的情况下。同一个Job在batch启动后被多次调用的场合,startNextInstance方法将会非常有用,因为它将使用JobParametersIncrementer与Job绑定,创建一个新实例。因为JobParametersIncrementer有一个getNext方法,可以在此方法中为parameters添加一个自增的值,以区分不同的Job实例,当然,这个值在job的其他的地方并不会用到,仅仅是为了标示不同JobInstance。当然SpringBatch框架也为我们提供了一个JobParametersIncrementer的实现类RunIdIncrementer 。使用方法如下:

......

class="ncrementer"/>

RunIdIncrementer的getNext方法实现如下:

public JobParameters getNext(JobParameters parameters) {

if (parameters == null) {

parameters = new JobParameters();

}

long id = g(key, 0L) + 1;

return new JobParametersBuilder(parameters).addLong(key, id).toJobParameters();

}

由代码可以看出,当parameters为null时,创建一个新的JobParameters,并添加一项“key”;不为null时,直接给原来的parameters添加一项。最后直接返回。因为key代表的value每次是都会在原来的基础上加1,这样就保证了每次创建的jobInstance是不同的了。

以上,就是JobParameters主要知识点的介绍,下次,将介绍SpringBatch其他的一些技术知识点。


本文标签: 文件 处理 信息 数据 配置