前回はSQSへキューを送信するProducerアプリケーションを実装しました。今回から2回に渡り、SQSメッセージをポーリングして取得するConsumerアプリケーションから実行されるバッチ処理を「Spring Batch」を使って実装していきます。
SpringBatchを使用したサンプルアプリケーション
今回作成するSpringBatchを使ったアプリケーションは、以下の図のようなイメージで構成します。
SpringBatchの基本的な内容について、ここでは詳細な説明は行いません。必要に応じて、TERASOLUNAガイドラインの「SpringBatchのアーキテクチャ」を参照してください。ただし、このガイドラインで扱っている「TERASOLUNA Batch」では、SpringBatchを使った実装のベストプラクティスの提供に加えて、SpringBatchでは提供されていないDBポーリングによる ジョブの別プロセス非同期実行機能をライブラリとして提供していますが、本連載では、純粋にSpringBatchのみを使って実装します。
また、ガイドラインでは、SpringBootを使ってSpring Batchアプリケーションを実装する方法については述べられていませんが、本連載では、JavaのConfigクラスを使って、SpringBootをベースとした設定方法を扱います。なお、SpringBatchの処理起動はメインクラスによる実行とSQSのポーリングでメッセージを取得後実行する2パターン作成しますが、SQSへのポーリングに関しては、次回以降で解説します。
本連載で実際に作成するSpringBatchアプリケーションは、GitHub上にコミットしています。 以降に記載するソースコードでは、import文など本質的でない記述を省略している部分があるので、実行コードを作成する際は、必要に応じて適宜GitHubにあるソースコードも参照してください。
SpringBootを使ってSpringBatchアプリケーションを作成するには、前回同様、Mavenプロジェクトのpom.xmlでspring-boot-starter-batchのライブラリを追加してください。 また、モデルオブジェクトを簡素化する目的でLombokライブラリを、バッチジョブのリスタートや実行結果の管理のために作られるJobRepository向けのデータベースとしてH2を追加します。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
</dependencies>
それでは、SpringBatchアプリケーションの実装に進みます。今回作成するアプリケーションのコンポーネント構成は、以下のようにバッチ処理を構成する上でよく使用される処理で構成してみます。
コンポーネント | 説明 | 必須 |
---|---|---|
SpringBatchApplication | SpringBootアプリケーションを実行する起動クラス。単体で実行するために作成 | ― |
BatchAppConfig | バッチ処理の実行単位で作成するBean定義クラス | ◯ |
BatchInfraConfig | ジョブ管理テーブルへのデータアクセスを定義するクラス | ◯ |
SampleTasklet | ジョブの最初のステップで前処理として実行するTaskletクラス | ― |
SampleProcessor | ジョブのステップで実行するProcessorクラス | ― |
SampleWriter | ジョブのステップで実行するWriterクラス | ― |
SamplePartitioner | ProcessorやWriterをマルチスレッドで並列実行する場合の分割単位を定義するクラス | ― |
SampleListener | ジョブの実行前後で呼び出されるListenerクラス | ― |
SpringBatchの設定クラス実装
それでは、実装していくクラスについて説明します。まずは、SpringBoot起動クラスと、各種設定クラスです。
@SpringBootApplicaitonアノテーションが付与された起動クラスは、同一パッケージにある@Configurationアノテーションが付与された設定クラスと、 設定クラス内で@ComponentScanされたパッケージにあるクラスを読み取ります。ここでは、起動クラスと別パッケージにある設定クラスを読み取るために、 @Importを使用します。読み取るクラスは、以下の2つを作成します。
- Batch処理を定義する設定クラス:BatchAppConfigクラス
- ジョブ実行に必要なリソースを定義するクラス:BatchInfraConfigクラス
設定クラスは必ずしも複数である必要はなく、1つにまとめても動作上問題ありません。ただし、クラス名と役割を対応付けて作成しておいたほうが、後々混乱することなく、クラス名から設定内容を識別できるのでベターでしょう。
なお、コンポーネントスキャンを使わずに直接設定クラスを読み込む理由は、 通常バッチは単体の処理で完結するので、必要のないジョブコンポーネントの読み込みを回避して、パフォーマンスを向上させるためです。 また、下記の実装はBatchアプリケーションをローカル単体でテスト実行できるようなかたちで実装したものになります(次回以降解説するConsumerアプリケーションでは、こちらのクラスからは起動しません)。
package org.debugroom.mynavi.sample.aws.sqs.app.batch.launcher;
import org.debugroom.mynavi.sample.aws.sqs.config.BatchAppConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Import;
@Import(BatchAppConfig.class)
@SpringBootApplication
public class SpringBatchApplication {
public static void main(String[] args) {
String inputParam = "param=1";
new SpringApplicationBuilder(SpringBatchApplication.class)
.web(WebApplicationType.NONE)
.run(new String[]{inputParam});
}
}
起動クラスから読み込む「Batchの処理定義を記載したBatchAppConfigクラス」はバッチジョブ単位に作成し、起動クラスとジョブを1:1で組み合わせて、@Importで読み込んで実行するとよいでしょう。コードは以下の通りです。
package org.debugroom.mynavi.sample.aws.sqs.config;
// omit
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.io.DefaultResourceLoader;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
@Import(BatchInfraConfig.class) // ...(A)
@Configuration
@EnableBatchProcessing // ...(B)
public class BatchAppConfig extends DefaultBatchConfigurer { // ...(C)
@Autowired
JobBuilderFactory jobBuilderFactory; // ...(D)
@Autowired
StepBuilderFactory stepBuilderFactory; // ...(E)
@Bean
public Job job(@Qualifier("step1") Step step1, @Qualifier("step2") Step step2){
return jobBuilderFactory.get("job")
.listener(jobExecutionListener())
.start(step1)
.next(partionStep())
.build(); // ...(F)
}
@Bean
public Step step1(){
return stepBuilderFactory
.get("step1")
.tasklet(sampleTasklet())
.build(); // ...(G)
}
@Bean
protected Step step2(){
return stepBuilderFactory.get("step2")
.<Sample, Sample>chunk(10)
.reader(sampleFlatFileItemReader(null))
.processor(sampleProcessor())
.writer(sampleWriter())
.build(); // ...(H)
}
@Bean
protected Step partionStep(){
return stepBuilderFactory.get("partitionStep")
.partitioner(step2())
.partitioner("step2", partitioner(null))
.taskExecutor(taskExecutor())
.build(); // ...(I)
}
@Bean
@StepScope
@Value("#{jobExecutionContext['paramBySampleTasklet']}") // ...(J)
public FlatFileItemReader<Sample> sampleFlatFileItemReader(String paramBySampleTasklet){
FlatFileItemReader<Sample> flatFileItemReader = new FlatFileItemReader<>();
flatFileItemReader.setResource(new DefaultResourceLoader().getResource(paramBySampleTasklet));
DefaultLineMapper<Sample> defaultLineMapper = new DefaultLineMapper<>();
DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer();
delimitedLineTokenizer.setNames("stepParam");
defaultLineMapper.setLineTokenizer(delimitedLineTokenizer);
BeanWrapperFieldSetMapper<Sample> beanWrapperFieldSetMapper = new BeanWrapperFieldSetMapper<>();
beanWrapperFieldSetMapper.setTargetType(Sample.class);
defaultLineMapper.setFieldSetMapper(beanWrapperFieldSetMapper);
flatFileItemReader.setLineMapper(defaultLineMapper);
return flatFileItemReader; // ...(K)
}
@Bean
@StepScope
protected ItemProcessor<Sample, Sample> sampleProcessor(){
return new SampleProcessor(); // ...(L)
}
@Bean
@StepScope
protected ItemWriter<Sample> sampleWriter(){
return new SampleWriter(); // ...(M)
}
@Bean
protected Tasklet sampleTasklet(){
return new SampleTasklet(); // ...(N)
}
@Bean
protected JobExecutionListener jobExecutionListener(){
return new SampleListener(); // ...(O)
}
@Bean
public TaskExecutor taskExecutor(){
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
simpleAsyncTaskExecutor.setConcurrencyLimit(10);
return simpleAsyncTaskExecutor; // ...(P)
}
@Bean
@StepScope
@Value("#{jobExecutionContext['paramBySampleTasklet']}")
public Partitioner partitioner(String paramBySampleTasklet){
return new SamplePartitioner(paramBySampleTasklet); // ...(Q)
}
@Override
@Autowired
public void setDataSource(@Qualifier("batchDataSource") DataSource dataSource){
super.setDataSource(dataSource); // ...(R)
}
}
こうした設定クラスはバッチジョブ単位に作成し、起動クラスとジョブを1:1で組み合わせて、@Importで読み込んで実行するとよいでしょう。
設定クラスコードの説明は以下の通りです。
項番 | 説明 |
---|---|
A | BatchInfraConfig設定クラスをインポートします。 |
B | @EnableBatchProcessingを付与することで、Batch設定クラスとして扱われます。この設定によりJobRepositoryといったSpringBatchの主要なコンポーネントが自動で構築されていくようになります。 |
C | 今回のサンプルではリスタートや実行結果の管理のために作られるJobRepository用のDataSourceをインメモリDBへ置き換えるために使用します。DefaultBatchConfigurerを継承することにより、当設定クラス内でデータソースを置き換えるようオーバーライドします(Rを参照) |
D | ジョブ内でステップ処理フローを定義するためにJobBuildFactoryをインジェクションします |
E | ステップ内で実行する処理を定義するためにStepBuildFactoryをインジェクションします |
F | ジョブのステップ処理フローを定義します。ここでは、"job"というジョブ名に、リスナーとタスクレットを実行する"step1"、Readerとパーティショナーを組み込んだProcessor、Writerを実行する"step2"を定義しています |
G | ステップ処理を定義します。ここでは、"step1"というstep名に、SampleTaskletクラスを定義しています |
H | ステップ処理を定義します。ここでは、"step2"というstep名に、IOモデルクラス、チャンクサイズ、Reader、Processor、Writerクラスを定義しています |
I | "step2"をパーティション化して実行するステップを定義します。ここでは、Partitionerクラスおよび、各ステップを実行するTaskExecutorを定義しています |
J | Step間で共有するパラメータを設定します。ここではJobExcutionContext内に"paramBySampleTasklet"というパラメータをSampleTaskletクラスで設定し、FlatFileItemReaderの実行引数として受け取るかたちで実装しています |
K | (H)で設定した通り、SpringBatchから提供されているFlatFileItemReaderで型パラメータとして、モデルクラスとなるSampleクラスを指定してStepScopeで定義します。ここで実装している内容は次回改めて解説します |
L | (H)で設定した通り、ProcessorとしてSampleProcessorクラスをBean定義します |
M | (H)で設定した通り、WriterとしてSampleWriterクラスをBean定義します |
N | (G)で設定した通り、TaskletとしてSampleTaskletクラスをBean定義します |
O | (F)で設定した通り、リスナーとしてSampleListenerクラスをBean定義します |
P | (I)で設定した通り、パーティショナーを使って並列実行を行う場合のTaskExecutorクラスとして、SimpleAsyncTaskExecutorをBean定義します |
Q | (I)で設定した通り、Partitionerクラスとして、SamplePartitionerクラスをBean定義します。JobExcutionContext内に"paramBySampleTasklet"というパラメータ(パーティションキーの元となる読み込むファイルのパス)をコンストラクタインジェクションしてBean生成します |
R | (C)で設定した通り、リスタートや実行結果の管理のために作られるJobRepository用のDataSourceをインメモリDBへ置き換えるかたちで設定します |
また、BatchInfraConfigクラスには複数のジョブ設定クラスから使用される内容を定義します。ここでは、JobRepositoryのためのデータソースをインメモリDBとしてH2を定義します。
package org.debugroom.mynavi.sample.aws.sqs.config;
import javax.sql.DataSource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
@Configuration
public class BatchInfraConfig {
@Bean(name="batchDataSource")
public DataSource jobRepositoryEmbeddedDataSource(){
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.H2)
.addScript("classpath:/org/springframework/batch/core/schema-h2.sql")
.build();
}
}
これで、Batchアプリケーションの設定クラスが作成できました。次回は、TaskletやProcessorなどのバッチ処理クラスを実装します。
著者紹介
川畑 光平(KAWABATA Kohei) - NTTデータ 課長代理
金融機関システム業務アプリケーション開発・システム基盤担当を経て、現在はソフトウェア開発自動化関連の研究開発・推進に従事。
Red Hat Certified Engineer、Pivotal Certified Spring Professional、AWS Certified Solutions Architect Professional等の資格を持ち、アプリケーション基盤・クラウドなどさまざまな開発プロジェクト支援にも携わる。2019 APN AWS Top Engineers & Ambassadors選出。
本連載の内容に対するご意見・ご質問は Facebook まで。