지금까지의 과정을 요약해보자.
먼저 AsyncItemProcessor / AsyncItemWrite를 이용하는 방식은 기본적으로 read는 동기적으로 처리하면서, processor와 write만 비동기적으로 처리하는 것이었다.
다음으로 multi-thread는 read, process, write를 모두 비동기적으로 처리하는 것으로, 한 step 내에서 reader, processor, writer를 chunk단위로 병렬수행해야 할 때 사용한다.
그렇다면 위 두가지 방식의 공통점은 무엇일까 ? 바로 병렬처리가 step내부 단위에서 일어난다는 것이다 !
그런데 Job에는 여러가지의 Step이 있을 수 있으므로, 이 Step들 또한 병렬적으로 처리해야 하는 경우가 있다. 지금부터 이런 경우에 대해 알아보자.
Paralle Steps가 바로 Step을 병렬로 처리할 때 사용하는 방법이다.
Job은 기본적으로 step을 한번에 하나씩 선형적으로 실행하지만, 이 방법을 사용해서 병렬로 step을 처리할 수 있다.
parallel steps 에서는, flow라는 개념을 적극적으로 사용한다. step을 flow로 한번 감싸주고, 이 flow들이 병렬로 수행된다고 이해하면 좋을 것이다.
그런데 여기서 특이한 건 step4다. 기본적으로 step4는 병렬처리가 되지 않으며, step1과 step2, step3가 동시에 병렬처리가 되며, 이 모든게 끝나나 뒤에 step1의 후속작업인 step4가 수행되는 것이다.
private Object lock = new Object();
synchronized (lock) {
for (int i = 0; i < 1000000000; i++) {
sum++;
}
// thread 이름 출력
System.out.println(String.format("%s has been executed on thread %s",
chunkContext.getStepContext().getStepName(),
Thread.currentThread().getName()));
System.out.println(String.format("sum : %d", sum));
}
외부 remote 서버에서 병렬 수행해야 할 때 사용한다.
지금까지는 모두 스프링 배치를 Spring Integration이 감싸서 프로세스가 수행되는 식이었다.
그러나 이런 Spring Integration도 스프링 배치 내부에서 사용될 수 있다 !
이를 통해 아이템, 청크 처리를 외부 프로세스로 위임할 수 있다.
이와 같은 방식으로 remote Chunking과 remote Partitioning이 있는데, 둘을 각각 알아보자.
remote chunking은 상단과 같이 구성된다고 할 수 있다.
remote chunking을 사용하기 위해서는 먼저 step을 여러 프로세스로 나눈다. 그리고 나눈 프로세스를 다른 미들웨어에 할당하여 사용한다.
여기서 step1, step2, step3를 수행하는 것을 매니저 컴포넌트, 하단의 step2a, step2b, step2c를 수행하는 것을 워커라고 해보자.
이때 매니저 컴포넌트는 싱글 프로세스이고 워커는 멀티 리모트 프로세스다. 이 패턴은 매니저에 병목이 없어야 가장 잘 동작하므로, 반드시 process가 read보다 훨씬 무거운 작업일 때 사용해야 한다. (프로세스가 가볍다면 굳이 나누어서 remote에 위임할 필요가 없겠지 ?)
@EnableBatchIntegration
@EnableBatchProcessing
public class RemoteChunkingJobConfiguration {
@Configuration
public static class ManagerConfiguration {
@Autowired
private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory;
@Bean
public TaskletStep managerStep() {
return this.managerStepBuilderFactory.get("managerStep")
.chunk(100)
.reader(itemReader()) //**1**
.outputChannel(requests()) // **2** requests sent to workers
.inputChannel(replies()) // **7**replies received from workers
.build();
}
}
@Configuration
public static class WorkerConfiguration {
@Autowired
private RemoteChunkingWorkerBuilder workerBuilder;
@Bean
public IntegrationFlow workerFlow() {
return this.workerBuilder
.itemProcessor(itemProcessor()) // **3**
.itemWriter(itemWriter()) // **4**
.inputChannel(requests()) // **5** requests received from the manager
.outputChannel(replies()) // **6** replies sent to the manager
.build();
}
}
}
1. manager가 item Reader를 사용해 item을 읽는다.
2. worker에게 request를 보낸다.
3~4. worker는 process, write작업을 한다.
5~6. manager로부터 request를 받고, 이에 대한 reply를 보낸다.
7. manager는 worker로 부터 request를 받는다.