๐น ์์น๊ธ ์ฐจ์ก ๋น๊ต Spring Batch ๋ฆฌํํ ๋ง
๊ธฐ์กด ์์น๊ธ ์ฐจ์ก๋น๊ต Batch์ Tasklet๋ฐฉ์์ ๋ฐฐ์น์ ๋จ์ ์ ๋ณด์ํ๋ chunk ๋ฐฉ์์ ๋ฐฐ์น๋ฅผ ๊ตฌํํ๊ณ , ๋์๊ฐ ๋ค๋ฅธ ๊ธฐ๋ฅ์ Batch์๋ ํจ๊ณผ์ ์ผ๋ก ๋น ๋ฅด๊ฒ ์ ์ฉํ ์ ์๋์ฌ์ฌ์ฉ์ฑ/์ ์ง๋ณด์์ฑ ๋์ ์ฝ๋, ์ ๋ก๋ฅผ ๋ง๋ค๊ธฐ ์ํจ์ด๋ค.
ํญ๋ชฉ | Partitioning (Spring Batch) | ParallelStream, CompletableFuture |
---|---|---|
์ ์ฉ ๊ณ์ธต | Batch Job์ Step ๋ ๋ฒจ | ๋น์ฆ๋์ค ๋ก์ง ๋๋ ์๋น์ค ๊ณ์ธต |
์ฃผ์ ๋ชฉ์ | ๋์ฉ๋ ๋ฐ์ดํฐ๋ฅผ ๋ณ๋ ฌ ์ฒ๋ฆฌํ์ฌ Batch ์๋ ๊ฐ์ | ์ฐ์ฐ ์ต์ ํ, ๋น๋๊ธฐ ์ฒ๋ฆฌ, ์๋ต ์๊ฐ ๋จ์ถ |
ํธ๋์ญ์ ๊ด๋ฆฌ | Spring Batch์์ ์ ๊ณต (ํธ๋์ญ์ ๋ถ๋ฆฌ๋จ) | ๊ฐ๋ฐ์๊ฐ ์ง์ ๊ด๋ฆฌํด์ผ ํจ |
์ฌ์์ ๋ฐ ๋ณต๊ตฌ | Spring Batch๊ฐ ์ง์ | ์๋์ผ๋ก ๊ตฌํ ํ์ |
์ค๋ฒํค๋ | Spring Batch ์ปจํ ์คํธ ๋ฐ ํํฐ์ ์ค์ ์ค๋ฒํค๋ | ์๋์ ์ผ๋ก ๊ฐ๋ณ์ง๋ง ํธ๋์ญ์ ์์ |
๋ณต์ก๋ | ์ค์ ๊ณผ ํํฐ์ ์ ์๊ฐ ๋ณต์กํจ | ๊ตฌํ์ด ๋จ์ํ๊ณ ์ง๊ด์ |
์๋ฌ ํธ๋ค๋ง | Spring Batch์์ ๊ด๋ฆฌ | ๊ฐ๋ฐ์๊ฐ ๋ณ๋ ํธ๋ค๋ง ํ์ |
์ค๋ ๋ ๊ด๋ฆฌ | Spring Batch์ PartitionHandler ๊ด๋ฆฌ | ForkJoinPool ๋๋ ์ค๋ ๋ ํ ์ง์ ๊ด๋ฆฌ |
์ํ ๊ด๋ฆฌ | ExecutionContext๋ฅผ ํตํด ์์ ํ๊ฒ ๊ด๋ฆฌ | ๊ณต์ ๋ณ์๋ ์ํ ๊ด๋ฆฌ๋ ๊ฐ๋ฐ์ ์ฑ ์ |
ExecutionContext
๊ฐ ์ฃผ์ด์ ธ ์ํ๋ฅผ ์์ ํ๊ฒ ๊ด๋ฆฌํ ์ ์๋ค.Partition ์์ฑ:
๊ฐ Partition์์ Chunk ์ฒ๋ฆฌ:
๋ณ๋ ฌ ์คํ:
ํธ๋์ญ์ ๊ด๋ฆฌ:
Partition ์์ฑ:
๋ชจ๋ ๋๊ธฐ์ด ์๋ชจ ๋ฐ ์ต๋ ์ค๋ ๋ ํ ๋๋ฌ => ์์ ์ค๋จ ๋ฐ ์ค๋ฅ
-> ์ด๋ก๋ณผ๋, ์ฟผ๋ฆฌ์์ ๋ณด๋ค apiํธ์ถ์์ ์ ์์์๊ฐ์ด ๊ธธ์ด๋ณด์
ํน์ง | SimpleAsyncTaskExecutor | ThreadPoolTaskExecutor |
---|---|---|
์ค๋ ๋ ์์ฑ ๋ฐฉ์ | ์์ (ํํฐ์ ๋จ์)๋ง๋ค ์ ์ค๋ ๋ ์์ฑ | ์ค๋ ๋ ํ์์ ์ค๋ ๋๋ฅผ ์ฌ์ฌ์ฉ |
์ค๋ ๋ ๊ฐ์ ์ ํ | ์์ | corePoolSize , maxPoolSize ๋ก ์ ํ |
๋๊ธฐ์ด ์ง์ | ์์ | ์์
๋๊ธฐ์ด(queueCapacity ) ์ง์ |
๋๋ ๋ฐ์ดํฐ ์ฒ๋ฆฌ | ๋นํจ์จ์ | ํจ์จ์ |
์ค๋ ๋ ๊ด๋ฆฌ | ์์ | ์ค๋ ๋ ํ๋ก ๊ด๋ฆฌ |
์ค๋ฒํค๋ | ์ค๋ ๋ ์์ฑ/์๋ฉธ๋ก ์ค๋ฒํค๋ ํผ | ์ค๋ ๋ ์ฌ์ฌ์ฉ์ผ๋ก ์ค๋ฒํค๋ ์ ์ |
์ถ์ฒ ์ฌ์ฉ ์ฌ๋ก | ๊ฐ๋จํ ์์ , ํ ์คํธ์ฉ | ๋๋ ์์ , ๋ณ๋ ฌ ์ฒ๋ฆฌ, ์์ ์ ์ฒ๋ฆฌ ํ์ ์ |
๋ฒ์ - 2017-11-07 ~ 2017-12-01
startDate=2017-11-07&endDate=2017-12-01
## ์ปฌ๋ผ ๊ฐ์ - 292๊ฐ
๊ธฐ์กด ๋ณ๋ ฌ์ฒ๋ฆฌ : 1๋ถ 7์ด
๊ธฐ์กด๋ก์ง : 6๋ถ 32์ด
t:์ค๋ ๋ ๊ฐ์ / c:๊ฐ ์ค๋ ๋๋ณ ์ฒญํฌ ํํฐ์
์
###### SimpleAsyncTaskExecutor vs ThreadPoolTaskExecutor
SimpleAsyncTaskExecutor
g4/c10 : 1๋ถ 41.586์ด
g5/c10 : 1๋ถ 30.037์ด
g5/c3 : 1๋ถ 28.997์ด
g5/c1 : 1๋ถ 29.931์ด
g8/c10 : 1๋ถ 36.019์ด
g10/c1 : 1๋ถ 24.529์ด
-
ThreadPoolTaskExecutor
Th min/maxSize64, g64/c10 : 1๋ถ 42.848์ด
Th min/maxSize32, g32/c10 : 1๋ถ 26.551์ด
Th min/maxSize16, g16/c10 :
1๋ถ 46.753์ด / 1๋ถ 33.779์ด / 2๋ถ 25.759์ด
1๋ถ 11.668์ด / 2๋ถ 5.773์ด / 2๋ถ 39.553์ด
Th min/maxSize32, g8/c20 : 3๋ถ 25.743์ด
Th min/maxSize16, g4/c10 : 2๋ถ 29.207์ด / 2๋ถ 8.004์ด
@Bean
@Scope(value = "step", proxyMode = ScopedProxyMode.TARGET_CLASS)
public QuerydslPagingItemReader<HfbatBankBalanceCheckDto> balanceReader() {
ExecutionContext jobContext = Objects.requireNonNull(StepSynchronizationManager.getContext()).getStepExecution().getJobExecution().getExecutionContext();
Date startDate = (Date) jobContext.get(START_DATE_KEY);
Date endDate = (Date) jobContext.get(END_DATE_KEY);
return new QuerydslPagingItemReader<>(
entityManagerFactory,
executionOrder,
DEFAULT_CHUNK_SIZE,
queryFactory -> repository.newFindChangeBalanceMemberList(
startDate,
endDate
));
}
JOB
private static AtomicLong executionOrder = new AtomicLong(0);
...
new QuerydslPagingItemReader<>(
entityManagerFactory,
executionOrder,
DEFAULT_CHUNK_SIZE,
queryFactory -> repository.newFindChangeBalanceMemberList(
startDate,
endDate
));
long currentExecutionOrder = executionOrder.getAndIncrement();
long startIndex = (currentExecutionOrder) * getPageSize();
int totalRecords = stepContext.getInt("totalRecords");
if (startIndex >= totalRecords) {
initResults(); // ๋น ๊ฒฐ๊ณผ๋ก ์ด๊ธฐํ
tx.commit();
return;
}
int chunkSizeToRead = Math.min(getPageSize(), (int) (totalRecords - startIndex)); // ๋จ์ ๋ฐ์ดํฐ ํฌ๊ธฐ๋งํผ ์ฝ๊ธฐ
// QueryDSL Query ์์ฑ
JPQLQuery<T> query = createQuery()
.offset(startIndex)
.limit(chunkSizeToRead);
1์ฐจ ๋น๊ต ์ดํ ์ฐจ์ก์ด ๋ฐ์ํ ๋ ์๋ค์ List ๋ฅผ ํ๋ฒ ๋ ๊ฒ์ฆํ ํ, => ์ถํ ๋ณ๊ฒฝ ๋จ ๊ฒ์ฆ๋ ๋ ์๋ค์
List<BalanceCheckResultDto> realDiffList= new ArrayList<>();
์ต์ข ์ ์ธ ์ฐจ์ก ๋ฆฌ์คํธ์ ๋ฃ๋๋ค.
writer๋ ๊ฐ ํ๋ก์ธ์์ ๋ฆฌํด์ผ๋ก ๋ฐ์ ๋ ์๋ค์ ํ๋์ DTO List๋ก ํฉ์ณ ๋ฉ์ธ์ง ์ฒ๋ฆฌ๋ฅผ ํ๊ฒ ๋๋ค.
QueueManagerํด๋์ค๋ฅผ ์์ฑํ๋ฉฐ ๊ณตํต์ผ๋ก ์ฌ์ฉ ๊ฐ๋ฅํ๋๋ก ํ์๊ณ ,
@Slf4j
public class QueueManager<T> {
protected final ConcurrentLinkedQueue<T> sharedQueue = new ConcurrentLinkedQueue<>();
// ๋ฐ์ดํฐ ์ถ๊ฐ
public void addItemToSharedQueue(T item) {
if (item != null) {
sharedQueue.add(item);
}
}
์ด๋ฅผ ์์๋ฐ์ ํน์ ์ค๋ธ์ ํธ๋ฅผ ๋๊ธธ ์ ์๋๋ก ํ์๋ค.
public class BalanceQueue extends QueueManager<BalanceCheckResultDto>{
public List<BalanceCheckResultDto> getDtoFromQueue() {
return super.getItemsFromQueue();
}
}
ํน์ง | JobParameters |
ExecutionContext |
---|---|---|
์ฝ๊ธฐ/์ฐ๊ธฐ ์ฌ๋ถ | ์ฝ๊ธฐ ์ ์ฉ | ์ฝ๊ธฐ/์ฐ๊ธฐ ๊ฐ๋ฅ |
๊ณต์ ๋ฒ์ | Job ์ ์ฒด์์ ๊ณต์ | Step ๋๋ Job ์ ์ฒด์์ ๊ณต์ ๊ฐ๋ฅ |
์ฉ๋ | ์คํ ์ ๋งค๊ฐ๋ณ์ ์ ๋ฌ | ์คํ ์ค ์ํ ์ ์ฅ, ๋ฐ์ดํฐ ๊ณต์ |
์๋ช | JobInstance์ ํจ๊ป ์ ์ง | StepExecution ๋๋ JobExecution๊ณผ ํจ๊ป ์ ์ง |
Restart ์ ์ ์ง ์ฌ๋ถ | ํญ์ ์ ์ง๋จ | Restart ๊ฐ๋ฅํ ์ํ๋ง ์ ์ง |
๊ฐ Step์ ๊ณ ์ ํ ExecutionContext
๊ฐ ์์ฑ๋ฉ๋๋ค.
Step ๋ด์ Reader, Processor, Writer ๋ฑ์์ ๊ณต์ ๋ฉ๋๋ค.
๋ค๋ฅธ Step๊ณผ๋ ๊ณต์ ๋์ง ์์ต๋๋ค.
balanceWorkerStep
๋ด์์๋ Reader, Processor, Writer๊ฐ ๋์ผํ ExecutionContext
๋ฅผ ๊ณต์ ํฉ๋๋ค.
balancePartitionStep
๊ณผ balanceWorkerStep
์ ExecutionContext
๋ ์๋ก ๋
๋ฆฝ์ ์
๋๋ค.
ExecutionContext
์ ์ ๊ทผํ ์ ์์ต๋๋ค.JobExecutionContext
๋ Step ๊ฐ ๋ฐ์ดํฐ ์ ๋ฌ์ด ํ์ํ ๋ ์ ์ฉํฉ๋๋ค.JobExecutionListener ๊ฐ์ฒด ์์ฑ
@Slf4j
public class JobTimerExecutionListener implements JobExecutionListener {
private final String jobName;
private long startTime = System.currentTimeMillis();
public JobTimerExecutionListener(String jobName) {
this.jobName = jobName;
}
@Override
public void beforeJob(JobExecution var1) {
startTime = System.currentTimeMillis();
}
@Override
public void afterJob(JobExecution var1) {
long endTime = System.currentTimeMillis();
long elapsedTime = endTime - startTime;
long minutes = (elapsedTime / 1000) / 60; // ๋ฐ๋ฆฌ์ด๋ฅผ ๋ถ์ผ๋ก ๋ณํ
double seconds = (elapsedTime / 1000.0) % 60; // ๋จ์ ๋ฐ๋ฆฌ์ด๋ฅผ ์ด๋ก ๋ณํ (์์์ ํฌํจ)
log.info("{}-completed: {} ms | {} minutes {} seconds", jobName, elapsedTime, minutes, seconds);
}
}