๐ฆ Spring Batch ํธ๋ฌ๋ธ์ํ
** Chunk ๋ฐฉ์์ Batch์์ ChunkSize๋, ํ ํธ๋ ์ ์ ๋ด์์ ์ฒ๋ฆฌํ ์ปฌ๋ผ(DTO/๋ชจ๋ธ)์ ๊ฐ์์ด๋ค.
์ฆ, ChunkSize๊ฐ ์์์๋ก ๋ฐ์ดํฐ I/O์์ ๋ฐ Overhead(๋ฐ์ดํฐ ์ฝ๊ธฐ/์ฐ๊ธฐ, ํธ๋์ญ์ ์์ ๋ฐ ์ข ๋ฃ ๋ฑ)๊ฐ ์ฆ๊ฐํ์ฌ ์ด ์คํ์๊ฐ์ด ๊ธธ์ด์ ธ์ผํ๋ค.
grid-size:12 / chunk-size:30
-
3๋ถ 22.856์ด
3๋ถ 23.096์ด
grid-size:12 / chunk-size:20
-
3๋ถ 23.546์ด
3๋ถ 23.784์ด
grid-size:12 / chunk-size:10
-
3๋ถ 24.243์ด
3๋ถ 22.389์ด
3๋ถ 24.667์ด
3๋ถ 24.789์ด
grid-size:12 / chunk-size:5
-
3๋ถ 24.953์ด
3๋ถ 24.353์ด
dtoList.stream().parallel()
.forEach(dto -> {});
}
}
IntStream.range(1, 10)
.parallel()
.forEach(i -> System.out.println(Thread.currentThread().getName() + " - " + i));
}
}
- ์ถ๋ ฅ
ForkJoinPool.commonPool-worker-3 - 3
ForkJoinPool.commonPool-worker-1 - 1
ForkJoinPool.commonPool-worker-2 - 2
ForkJoinPool.commonPool-worker-0 - 4
...
parallel()
๊ตฌ๋ฌธ์ ์ฌ์ฉํ์ฌ ๋ณ๋ ฌ ์ฒ๋ฆฌ๋ฅผ ์งํํ๋ค๋ฉด,
1.๋ฐ์ดํฐ๋ฅผ ์์ ๋จ์(Chunk)๋ก ๋ถํ
- ์๋ฅผ ๋ค์ด, 1000๊ฐ์ ๋ฐ์ดํฐ๋ฅผ 4๊ฐ์ ์ค๋ ๋์์ ์ฒ๋ฆฌํ๋ค๊ณ ํ๋ฉด, ForkJoinPool
์ ๋ฐ์ดํฐ๋ฅผ ์ฌ๋ฌ ๊ฐ์ Task๋ก ๋๋๋ค.
2.Worker Thread๋ค์ด ๋ถํ ๋ ์์
์ ๋ณ๋ ฌ๋ก ์คํ
- ๊ฐ ์ค๋ ๋๋ ์์ ์ด ๋งก์ ์์
์ ์ฒ๋ฆฌํ๊ณ , ๋จ๋ ์์
์ด ์๋ค๋ฉด ๋ค๋ฅธ ์ค๋ ๋์ ์์
์ ํ์ณ(Work-Stealing) ๊ฐ์ ธ์ ์คํํ๋ค.
3.์ต์ข
์ ์ผ๋ก ๊ฒฐ๊ณผ๋ฅผ ํฉ์ณ์ ๋ฐํ
- ๋ชจ๋ ์์
์ด ์๋ฃ๋๋ฉด ๋ณ๋ ฌ ์ฒ๋ฆฌ๋ ๊ฒฐ๊ณผ๊ฐ ํ๋๋ก ํฉ์ณ์ง๋คForkJoinPool
์ด๋?Java์์ ์ ๊ณตํ๋ ๋ณ๋ ฌ ์์
์ ์ต์ ํํ๋ ์ค๋ ๋ ํ๋ก์จ,
Work-Stealing ์๊ณ ๋ฆฌ์ฆ์ ์ฌ์ฉํ์ฌ ์ ํด์ค๋ ๋๋ฅผ ์ต์ํํ๊ณ CPU ํ์ฉ๋๋ฅผ ๊ทน๋ํํ๋ ๊ธฐ๋ฒ์ด๋ค.
ํด๋น ๊ณผ์ ์์ ForkJoinPool
์ ์์
์ ์ฌ๋ฌ ๊ฐ์ ์์ปค ์ค๋ ๋(ForkJoinPool-worker-*
)์์ ์คํํ๊ธฐ ๋๋ฌธ์ Spring์ ThreadLocal ๊ธฐ๋ฐ ํธ๋์ญ์
์ด ์ ํ๋์ง ์๋๋ค.
ForkJoinPool
๊ธฐ๋ฐ์ ์ ํํ ๋์์๋ฆฌ๋ฅผ ์ถฉ๋ถํ ๊ณ ๋ คํ์ง ์์ ๋ฐ์ํ ์ด์์ด๋ค.forEach
๋ฌธ์ผ๋ก ๋ณ๊ฒฝํจ์ผ๋ก์จ ์ด์๋ฅผ ํด๊ฒฐํ ์ ์์๋ค.ํนํ, ๋ฉํฐ์ค๋ ๋๋ฅผ ๋ค๋ฃฐ ๋์๋ ๋ณ๋ ฌ์ฒ๋ฆฌ๋ฅผ ํจ์ ์์ด ์ฃผ์๋ฅผ ํ์๋ก ํจ์ ๊นจ๋ฌ์๋ค.
SHOW VARIABLES LIKE 'max_connections'; //์ต๋ ๊ฐ์
SHOW STATUS LIKE 'Threads_connected'; //์ฌ์ฉ์ค์ธ ๊ฐ์
์ฒ์ ๋ช ๋ฒ๊ฐ์ ์ ์์คํ ๋์ง๋ง, ๋ฐ๋ณต ํ ์คํธ ์ค ์ค๋ ๋ ํ ์ ์ ๋๊ธฐ ํ์์์์ด ๋ฐ์ํ๋ค.
๋จผ์ , QuerydslPagingItemReader์ doReadPage()์ ์ข ๋ฃ์กฐ๊ฑด์์ ํธ๋ ์ ์ ์ปค๋ฐ์ ๋ณ๋๋ก ์ํํ์ง ์๊ณ ๋ฆฌํด์ ์ํค๊ณ ์์๋ค.
-> ๊ทธ๋ผ์๋ Step์ด ๋ง๋ฌด๋ฆฌ๋ ๋, ์ ์ด๋ Job์ด ๋ง๋ฌด๋ฆฌ ๋ ๋, entityManager๋ฅผ ํด๋ก์ฆ ์ํค๋๊ฒ์ด ์๋ช ํ๋ฐ, ์ด์งธ์ ์ปค๋ฅ์ ํ์ด ํด์ ๋์ง ์์ ์ ์๋๊ฐ.
๋จผ์ , ์คํฐํฐ ๋งค๋์ ๋ ์ปค๋ฅ์
์ ๋ฐํ์ ์์๋ฐ๋๋ค.
๊ทธ๋ฆฌ๊ณ .. transactionObserver๋ ์คํฐํฐ ๋งค๋์ ๊ฐ์ฒด์ ์ฐ๊ฒฐ๋ ํธ๋์ ์
๋งค๋์ ๊ฐ ์ข
๋ฃ๋๊ธฐ๋ฅผ ๋ฐ๋ผ๋ณด๊ณ ์์๋ค.
์ฆ,
์ํฐํฐ ๋งค๋์ ๋ ํด๋ก์ฆ ๋ ๋, ํธ๋ ์ ์ ์ด ์ด์์๋ค๋ฉด ๊ทธ ํธ๋ ์ ์ ์ด ์ข ๋ฃ๋ ๋๊น์ง ๊ธฐ๋ค๋ฆฐ๋ค.
protected void doReadPage() {
...
if (startIndex >= totalRecords) { //๋ง์ง๋ง์ธ๋ฑ์ค ํ์ธ
initResults(); // ๋น ๊ฒฐ๊ณผ๋ก ์ด๊ธฐํ
tx.commit(); //ํธ๋ ์ ์
์ปค๋ฐ
return;
}
...