병렬 스트림
#Java/adv3
단일 스트림
HeavyJob
클래스는 오래 걸리는 작업을 시뮬레이션하는데, 각 작업은 1초 정도 소요된다고 가정하겠다. 입력값에 10을 곱한 결과를 반환하며, 작업이 실행될 때마다 로그를 출력한다.
예제1 - 단일 스트림
단일 스트림(sequential stream)으로 IntStream.rangeClosed(1, 8)
에서 나온 1부터 8까지의 숫자 각각에 대해 heavyTask()
를 수행
long startTime = System.currentTimeMillis();
int sum = IntStream.rangeClosed(1, 8)
.map(HeavyJob::heavyTask)
.reduce(0, (a, b) -> a + b); // sum()
long endTime = System.currentTimeMillis();
log("time: " + (endTime - startTime) + "ms, sum: " + sum);
- 총 8초가 소요된다(1초씩 * 8)
스레드 직접 사용
스레드를 사용하여 작업을 빨리 처리해보자. 작업을 분할한 뒤 여러 스레드로 작업을 동시에 처리한다.
// 1. Fork 작업을 분할한다.
SumTask task1 = new SumTask(1, 4);
SumTask task2 = new SumTask(5, 8);
Thread thread1 = new Thread(task1, "thread-1");
Thread thread2 = new Thread(task2, "thread-2");
// 2. 분할한 작업을 처리한다.
thread1.start();
thread2.start();
// 3. join 처리한 결과를 합친다.
thread1.join();
thread2.join();
log("main 스레드 대기 완료");
int sum = task1.result + task2.result;
SumTask
는Runnable
을 구현했고, 내부에서 1초씩 걸리는heavyTask()
를 루프 돌면서 합산한다.- 고급 1편에서 구현했던 클래스다.
thread1.start()
,thread2.start()
로 각 스레드가 동시에 작업을 시작하고,thread1.join()
,thread2.join()
으로 두 스레드가 끝날 때까지main
스레드가 대기한다. 이후 최총 합계를 구한다.- 작업을 두 스레드가 분할하여 처리했기 때문에 8초 걸리던 작업을 4초로 줄였다.
- 하지만 이렇게 스레드를 직접 사용하면 스레드 수가 늘어나면 코드가 복잡해지고, 예외 처리, 스레드 풀 관리 등 추가 관리 포인트가 생기는 문제가 있다.
스레드 풀 사용:
ExecutorService
를 사용해서 편리하게 병렬 처리를 하도록 개선
// 스레드 풀을 준비한다.
ExecutorService es = Executors.newFixedThreadPool(2);
// 1. 작업을 분할한다.
SumTask task1 = new SumTask(1, 4);
SumTask task2 = new SumTask(5, 8);
// 2. 분할한 작업을 처리한다.
Future<Integer> future1 = es.submit(task1);
Future<Integer> future2 = es.submit(task2);
// 3. join 처리한 결과를 합친다. get: 결과가 나올 때 까지 대기한다.
Integer result1 = future1.get();
Integer result2 = future2.get();
int sum = result1 + result2;
es.close();
- 이전 예제와 마찬가지로 2개의 스레드가 병렬로 계산을 처리하므로 약 4초가 소요된다.
- 하지만 여전히 코드 레벨에서 분할/병합 로직을 직접 짜야 하고, 스레드 풀 생성과 관리도 개발자가 직접해야 한다.
Fork/Join 패턴
분할(Fork), 처리(Execute), 모음(Join)
- 하나의 큰 작업을 여러 스레드가 처리할 수 있는 작은 단위의 작업으로 분할(Fork)한 뒤, 이렇게 분할한 작업을 각각의 스레드가 처리(Execute)한다.
- 이렇게 분할(Fork) → 처리(Execute) → 모음(Join)의 단계로 이루어진 멀티스레딩 패턴을 Fork/Join 패턴이라고 부른다.
- 병렬 프로그래밍에서 매우 효율적인 방식으로, 복잡한 작업을 병렬적으로 처리할 수 있게 해준다.
- 작업 분할: 큰 작업을 여러 작은 작업으로 쪼개어(Fork) 각각의 스레드나 작업 단위로 할당하는 것을 포크(
Fork
)라 한다.- 포크에서 여러 갈래로 나뉘는 것이 작업을 분할하는 거랑 비슷해서 Fork라고 한다나 뭐라나
- 처리(Execute): 스레드가 분할된 각각의 작업을 처리
- Join 모음, 결과 합치기: 분할된 작업들이 모두 끝나면, 각 스레드 혹은 작업 단위별 결과를 하나로 합쳐야한다.
Join
은 이렇게 갈라진 작업들이 모두 끝난 뒤, 다시 합류하여 하나로 결과를 모으는 모습을 의미한다.
자바는 Fork/Join 프레임워크를 제공해서 개발자가 Fork/Join 패턴을 더 쉽게 구현할 수 있도록 지원한다.
Fork/Join 프레임워크1 - 소개
자바의 Fork/Join 프레임워크
는 자바 7부터 도입된 java.util.concurrent
패키지의 일부로, 멀티코어 프로세서를 효율적으로 활용하기 위한 병렬처리 프레임워크다.
분할 정복(Divide and Conquer) 전략
- 큰 작업(
task
)을 작은 단위로 재귀적으로 분할(fork
) - 각 작은 작업의 결과를 합쳐(
join
) 최종 결과를 생성 - 멀티코어 환경에서 작업을 효율적으로 분산 처리
작업 훔치기(Work Stealing) 알고리즘
- 각 스레드는 자신의 작업 큐를 가짐
- 작업이 없는 스레드는 다른 바쁜 스레드의 큐에서 작업을 "훔쳐와서" 대신 처리
- 부하 균형을 자동으로 조절하여 효율성 향상
주요 클래스
ForkJoinPool
- Fork/Join 작업을 실행하는 특수한
ExecutorService
스레드 풀 - 작업 스케줄링 및 스레드 관리를 담당
- 디폴트로 사용 가능한 프로세서 수 만큼 스레드 생성(CPU 코어 수만큼 생성)
- 분할 정복과 작업 훔치기에 특화된 스레드 풀
- Fork/Join 작업을 실행하는 특수한
ForkJoinTask
- Fork/Join 작업의 기본 추상 클래스
Future
를 구현했다.- 주로 두 하위 클래스를 구현해서 사용한다.
RecursiveTask<V>
: 결과를 반환하는 작업RecursiveAction
: 결과를 반환하지 않는 작업(void
)- 구현 방법:
compute()
메서드를 재정의해서 필요한 작업 로직을 작성한다.- 일반적으로 일정 기준(임계값)을 두고, 작업 범위가 작으면 직접 처리하고, 크면 작업을 둘로 분할하여 각각 병렬로 처리하도록 구현한다.
fork() / join() 메서드
fork()
: 현재 스레드에서 다른 스레드로 작업을 분할하여 보내는 동작(비동기 실행)join()
: 분할된 작업이 끝날 때까지 기다린 후 결과를 가져오는 동작
우리가 직접 Fork/Join 프레임워크를 실무에서 직접 다룰 일은 드물다.
Fork/Join 프레임워크 활용
위 예시를 Fork/Join 프레임워크로 구현
핵심은 작업의 크기가 임계값 보다 크면 분할하고, 임계값 보다 같거나 작으면 직접 처리하는 것이다.
작업 크기가 8, 임계값이 4라고 하면
- Fork: 작업의 크기가 8이면 임계값을 넘었다. 따라서 작업을 절반으로 분할한다.
- Execute: 다음으로 작업의 크기가 4라면 임계값의 범위 안에 들어오므로 작업을 분할하지 않고, 처리한다.
- Join: 최종 결과를 합친다.
SumTask
는 RecursiveTask<Integer>
를 상속받아 리스트의 합을 계산하는 작업을 병렬로 처리하는 클래스이다.
이 클래스는 Fork/Join 프레임워크
의 분할 정복 전략을 구현한다. RecursiveTask.compute()
메서드를 재정의해야 하자.
public class SumTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 4; // 임계값
private final List<Integer> list;
public SumTask(List<Integer> list) {
this.list = list;
}
@Override
protected Integer compute() {
// 작업 범위가 작으면 직접 합산
if (list.size() <= THRESHOLD) {
log("[처리 시작] " + list);
int sum = list.stream()
.mapToInt(HeavyJob::heavyTask)
.sum();
log("[처리 완료] " + list + " -> sum: " + sum);
return sum;
} else {
// 작업 범위가 크면 반으로 나누어 병렬 처리
int mid = list.size() / 2;
List<Integer> leftList = list.subList(0, mid);
List<Integer> rightList = list.subList(mid, list.size());
log("[분할] " + list + " -> LEFT" + leftList + ", RIGHT" + rightList);
SumTask leftTask = new SumTask(leftList);
SumTask rightTask = new SumTask(rightList);
// 왼쪽 작업은 다른 스레드에서 처리
leftTask.fork();
// 오른쪽 작업은 현재 스레드에서 처리
int rightResult = rightTask.compute();
// 왼쪽 작업 결과를 기다림
int leftResult = leftTask.join();
// 왼쪽과 오른쪽 작업 결과를 합침
int joinSum = leftResult + rightResult;
log("LEFT[" + leftResult + "] + RIGHT[" + rightResult + "] -> sum: " + joinSum);
return joinSum;
}
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
List<Integer> data = IntStream.rangeClosed(1, 8)
.boxed()
.toList();
// ForkJoinPool 생성 및 작업 수행
ForkJoinPool pool = new ForkJoinPool(10); // 최대 10개 스레드 사용
long startTime = System.currentTimeMillis();
SumTask task = new SumTask(data);
// 병렬로 합을 구한 후 결과 출력
int result = pool.invoke(task); // 작업 시작 및 완료 대기
pool.close(); // 풀 종료
long endTime = System.currentTimeMillis();
log("time: " + (endTime - startTime) + "ms, sum: " + result);
log("pool: " + pool);
}
- 데이터 생성:
IntStream.rangeClosed(1, 8)
를 사용해 1부터 8까지의 숫자 리스트를 생성한다. - ForkJoinPool 생성:
new ForkJoinPool(10)
으로 최대 10개의 스레드를 사용할 수 있는 풀을 생성한다.- 참고로 기본 생성자(
new ForkJoinPool()
)를 사용하면 시스템의 프로세서 수에 맞춰 스레드가 생성된다.
- invoke(): 메인 스레드가
pool.invoke(task)
를 호출하면SumTask
를 스레드 풀에 전달한다.SumTask
는ForkJoinPool
에 있는 별도의 스레드에서 실행된다. 메인 스레드는 작업이 완료될 때까지 기다린 후 결과를 받는다. - pool.close(): 더 이상 작업이 없으므로 풀을 종료한다.
- 결과 출력: 계산된 리스트의 합과 실행 시간을 출력한다.
실행 결과
- 4초 소요
[main] pool: java.util.concurrent.ForkJoinPool@57baeedf[Terminated, parallelism = 10, size = 0, active = 0, running = 0, steals = 2, tasks = 0, submissions = 0]
정리
Fork/Join 프레임워크
를 사용하면RecursiveTask
를 통해 작업을 재귀적으로 분할하는 것을 확인할 수 있다. 여기서는 작업을 단순히 2개로만 분할해서 스레드도 동시에 2개만 사용할 수 있었다.THRESHOLD
(임계값)을 더 줄여서 작업을 더 잘게 분할하면 더 많은 스레드를 활용할 수 있다. 물론 이 경우 풀의 스레드 수도 2개보다 더 많아야 효과가 있다.- 실제로는 작업 훔치기 과정이 포함되어 있다. 아래서 알아보자.
Fork/Join 프레임워크2 - 작업 훔치기
임계값을 2로 설정하면, 2초의 시간이 소요된다.
작업 훔치기 알고리즘
지금까지 설명을 단순화 하기 위해 작업 훔치기(Work-Stealing
) 알고리즘은 설명하지 않았다. 작업 훔치기가 뭔지 알아보자.
- Fork/Join 풀의 스레드는 각자 자신의 작업 큐를 가진다.
- 덕분에 작업을 큐에서 가져가기 위한 스레드간 경합이 줄어든다.
- 그리고 자신의 작업이 없는 경우, 그래서 스레드가 할 일이 없는 경우에 다른 스레드의 작업 큐에 대기중인 작업을 훔쳐서 대신 처리한다.
과정
ForkJoinPool
에 작업을 요청하면ForkJoinPool
내부에 있는 외부 작업 큐에 작업이 저장된다.- 포크 조인 풀의 스레드는 만약 자신이 할 일이 없고, 자신의 작업 큐에도 작업이 없는 경우 다른 작업 큐에 있는 작업을 훔쳐서 대신 처리할 수 있다.
w1
스레드는 자신이 처리할 일이 없으므로 다른 작업 큐의 작업을 훔친다. 여기서는 외부 작업 큐에 들어 있는 작업을 훔쳐서 대신 처리한다.
w1
은 훔친 작업의compute()
를 호출하면서 작업을 시작한다.w1
은 작업의 크기가 크다고 평가하고 작업을 둘로 분할한다.[1,2,3,4]
의 작업은fork
를 호출해서 비동기로 다른 스레드가 실행해주길 기대한다.- 사실
fork()
는 다른 스레드에 작업을 요청하는게 아니라, 스레드 자신의 작업 큐에 작업을 넣어두는 것이다. - 이후에 자신이 여유가 되면 스스로 보관한 작업을 처리하고, 자신이 여유가 없고 쉬는 스레드가 있다면 쉬는 스레드가 작업을 훔쳐가서 대신 처리한다.
w1
은[5,6,7,8]
작업을 분할한다.[5,6]
은 Fork를 통해서 자신의 작업 큐에 보관한다.[7, 8]
은compute
를 호출해서 스스로 처리한다.
w1
의 작업 큐에 작업이 2개나 대기 중이다. 쉬고 있는w2
가w1
의 작업[1,2,3,4]
를 훔친다.- 참고로 여기에 있는 큐는 데크에 가깝다. 따라서 양쪽으로 넣고 뺄 수 있는 구조이다. 경합이 덜 발생한다.
w2
는 작업의 크기가 크다고 평가하고 작업을 둘로 분할한다.[1,2]
의 작업은fork
를 호출해서 자신의 작업 큐에 넣어둔다.[3,4]
의 작업은compute
를 호출해서 스스로 처리한다. (재귀 호출)
- 작업 큐에 남아있는 작업들을
w3
,w4
스레드가 훔쳐가서 실행한다.w3
:w1
의 작업 큐[5,6]
을 훔쳐서 처리w4
:w2
의 작업 큐[1,2]
를 훔쳐서 처리
작업 훔치기 알고리즘
이 예제에서는 작업량이 균등하게 분배되었지만, 실제 상황에서 작업량이 불균형할 경우 작업 훔치기 알고리즘이 동작하여 유휴 스레드가 다른 바쁜 스레드의 작업을 가져와 처리함으로써 전체 효율성을 높일 수 있다.
Fork/Join 적절한 작업 크기 선택
적절한 임계값 설정은 병렬 처리의 효율성에 큰 영향을 미치므로, 작업의 특성과 시스템 환경에 맞게 조정하는 것이 중요하다.
너무 작은 단위로 작업을 분할하면 스레드 생성과 관리에 드는 오버헤드가 커질 수 있으며, 너무 큰 단위로 분할하면 병렬 처리의 이점을 충분히 활용하지 못할 수 있다.
예) 1 ~ 1000까지 처리해야 하는 작업, 스레드는 10개
- 1개 단위로 쪼개는 경우: 1000개의 분할과 결합이 필요. 한 스레드당 100개의 작업 처리
- 스레드가 작업을 찾고 관리하는 부분도 늘어나고, 분할과 결과를 합하는 과정의 오버헤드도 너무 크다.
- 10개 단위로 쪼개는 경우: 100개의 분할과 결합이 필요. 한 스레드당 10개의 작업 처리
- 100개 단위로 쪼개는 경우: 10개의 분할과 결합이 필요. 한 스레드당 1개의 작업 처리
- 500개 단위로 쪼개는 경우: 2개의 분할과 결합이 필요. 스레드 최대 2개 사용 가능
작업시간이 완전히 동일하게 처리된다고 가정하면 이상적으로는 한 스레드당 1개의 작업을 처리하는 것이 좋다. 왜냐하면 스레드를 100% 사용하면서 분할과 결합의 오버헤드도 최소화 할 수 있기 때문이다.
하지만 작업 시간이 다른 경우를 고려한다면 한 스레드당 1개의 작업 보다는 더 잘게 쪼개어 두는 것이 좋다.
ForkJoinPool
은 스레드의 작업이 끝나면 다른 스레드가 처리하지 못하고 대기하는 작업을 훔쳐서 처리하는 기능을 제공하기 때문이다.
그리고 실질적으로는 작업 시간이 완전히 균등하지 않은 경우가 많다. 이런 상황에서 최적의 임계값 선택을 위해 고려해야 할 요소들은 다음과 같다.
- 작업의 복잡성: 작업이 단순하면 분할 오버헤드가 더 크게 작용한다. 작업이 복잡할수록 더 작은 단위로 나누는 것이 유리할 수 있다. 예를 들어
1 + 2 + 3 + 4
의 아주 단순한 연산을1 + 2
,3 + 4
로 분할하게 되면 분할하고 합하는 비용이 더 든다. - 작업의 균일성: 작업 처리 시간이 불균일할수록 작업 훔치기(
work stealing
)가 효과적으로 작동하도록 적절히 작은 크기로 분할하는 것이 중요하다. - 시스템 하드웨어: 코어 수, 캐시 크기, 메모리 대역폭 등 하드웨어 특성에 따라 최적의 작업 크기가 달라진다.
- 스레드 전환 비용: 너무 작은 작업은 스레드 관리 오버헤드가 증가할 수 있다.
적절한 작업의 크기에 대한 정답은 없지만, CPU 바운드 작업이라고 가정할 때, CPU 코어수에 맞추어 스레드를 생성하고. 작업 수는 스레드 수에 4~ 10배 정도로 생성하자. 물론 작업의 성격에 따라 다르다. 그리고 성능 테스트를 통해 적절한 값으로 조절하면 된다.
Fork/Join 프레임워크3 - 공용 풀
자바 8에서는 공용 풀(Common Pool
) 개념이 도입되었는데, Fork/Join 작업을 위한 자바가 제공하는 기본 스레드 풀이다.
ForkJoinPool commonPool = ForkJoinPool.commonPool();
- 시스템 전체에서 공유: 애플리케이션 내에서 단일 인스턴스로 공유되어 사용된다.
- 자동 생성: 별도로 생성하지 않아도
ForkJoinPool.commonPool()
을 통해 접근할 수 있다. - 편리한 사용: 별도의 풀을 만들지 않고도
RecursiveTask
/RecursiveAction
을 사용할 때 기본적으로 이 공용 풀이 사용된다. - 병렬 스트림 활용: 자바 8의 병렬 스트림은 내부적으로 이 공용 풀을 사용한다. (뒤에서 설명한다.)
- 자원 효율성: 여러 곳에서 별도의 풀을 생성하는 대신 공용 풀을 사용함으로써 시스템 자원을 효율적으로 관리할 수 있다.
- 병렬 수준 자동 설정: 기본적으로 시스템의 가용 프로세서 수에서 1을 뺀 값으로 병렬 수준(
parallelism
)이 설정된다.- CPU 코어 8개 7개의 스레드가 사용됨
개발자는 Fork/Join 공용 풀을 이용하여 Fork/Join 풀을 편리하게 사용할 수 있다.
public class ForkJoinMain2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
int processorCount = Runtime.getRuntime().availableProcessors();
ForkJoinPool commonPool = ForkJoinPool.commonPool();
List<Integer> data = IntStream.rangeClosed(1, 8)
.boxed()
.toList();
SumTask task = new SumTask(data); // RecursiveTask 사용
// task.invoke() 호출 -> 내부적으로 common pool 사용
Integer result = task.invoke(); // 공용 풀 사용
log("최종 결과: " + result);
}
// SumTask는 이전 예제와 동일 (THRESHOLD=2)
static class SumTask extends RecursiveTask<Integer> {} // 생략
}
- 이전 예제에서는 직접 ForkJoinPool을 생성하고
pool.invoke(task)
를 통해 풀에 직접 작업을 요청했다. - 이번 예제에서는
task.invoke()
를 통해 작업(RecursiveTask
)에 있는invoke()
를 직접 호출했다. Integer result = task.invoke();
여기서 사용한invoke()
메서드는 현재 스레드(메인)에서 작업을 시작하여compute()
를 호출하고,fork()
로 작업을 분할하여 공용 풀에 있는 워커 스레드들이 분할된 작업을 수행한다.- 처음 compute()는 메인스레드가 호출하고, compute 내의 fork를 호출하면 공용 풀의 스레드로 작업이 분할된다.
- leftTask는 공용 풀 스레드, rightTask는 메인 스레드가 실행한다. 메인 스레드도 작업을 도와주는거다.
- 메인 스레드는 최종 결과가 나올 때 까지 블로킹되야 한다. 그냥 대기하는 것 보단 작업을 도와주는게 효율적이다.
invoke()
: 호출 스레드가 작업을 도우면서 대기(블로킹)한다. 작업의 결과를 반환 받는다.fork()
: 작업을 비동기로 호출하려면invoke()
대신에fork()
를 호출하면 된다.Future
(ForkJoinTask
)를 반환 받는다.
정리
- 공용 풀은 JVM이 종료될 때까지 계속 유지되므로, 별도로 풀을 종료(
shutdown()
)하지 않아도 된다. - 이렇게 공용 풀을 활용하면, 별도로 풀을 생성/관리하는 코드를 작성하지 않아도 간편하게 병렬 처리를 구현할 수 있다.
공용 풀 vs 커스텀 풀
- 자원 관리: 커스텀 풀은 명시적으로 생성하고 관리해야 하지만, 공용 풀은 시스템에서 자동으로 관리된다.
- 재사용성: 공용 풀은 여러 곳에서 공유할 수 있어 자원을 효율적으로 사용할 수 있다.
- 설정 제어: 커스텀 풀은 병렬 수준(스레드의 숫자), 스레드 팩토리 등을 세부적으로 제어할 수 있지만, 공용 풀은 기본 설정을 사용한다.
- 라이프사이클: 커스텀 풀은 명시적으로 종료해야 하지만, 공용 풀은 JVM이 관리한다. 따라서 종료하지 않아도 된다.
참고로 공용 풀 설정은 시스템 속성으로 변경할 수 있지만 권장하지 않는다고 한다.
공용 풀이 CPU - 1만큼 스레드를 생성하는 이유
- 메인 스레드의 참여: 메인 스레드도 작업을 도와주므로, (코어 개수 - 1)개의 워커 스레드 + 1개의 메인스레드로 CPU 코어 활용
- 다른 프로세스와의 자원 경쟁 고려: OS, GC, 다른 Application 같은 내부 작업들도 처리해야 하는데, 모든 코어를 최대치로 활용하면 다른 작업들이 지연되거나 컨텍스트 스위칭 비용이 증가할 수 있다. 코어 여유분 1개를 남겨둬서 시스템 성능을 안정적으로 유지하려는 목적이 있다.
- 효율적인 자원 활용: 특정 상황(다른 작업 스레드나 OS 레벨 작업)에서도 병목을 일으키지 않는 선에서 효율적으로 CPU를 활용
자바 병렬 스트림: 병렬 스트림(parallel()
)
병렬 스트림은 Fork/Join 공용 풀을 사용해서 병렬 연산을 수행한다.
스트림에 parallel()
추가
long startTime = System.currentTimeMillis();
int sum = IntStream.rangeClosed(1, 8)
.parallel() // 추가
.map(HeavyJob::heavyTask)
.reduce(0, (a, b) -> a + b);
long endTime = System.currentTimeMillis();
log("time: " + (endTime - startTime) + "ms, sum: " + sum);
14:56:32.773 [ main] processorCount = 14, commonPool = 13
14:56:32.777 [ main] calculate 6 -> 60
14:56:32.777 [ForkJoinPool.commonPool-worker-2] calculate 2 -> 20
14:56:32.777 [ForkJoinPool.commonPool-worker-3] calculate 4 -> 40
14:56:32.777 [ForkJoinPool.commonPool-worker-1] calculate 3 -> 30
14:56:32.777 [ForkJoinPool.commonPool-worker-5] calculate 7 -> 70
14:56:32.778 [ForkJoinPool.commonPool-worker-6] calculate 1 -> 10
14:56:32.778 [ForkJoinPool.commonPool-worker-4] calculate 8 -> 80
14:56:32.778 [ForkJoinPool.commonPool-worker-7] calculate 5 -> 50
14:56:33.783 [ main] time: 1006ms, sum: 360
- 로그를 보면
ForkJoinPool.commonPool-worker-N
스레드들이 동시에 일을 처리하고 있다. - 예제1에서 8초 이상 걸렸던 작업이, 이 예제에서는 모두 병렬로 실행되어 시간이 약 1초로 크게 줄어든다.
- 만약 CPU 코어가 4개라면 공용 풀에는 3개의 스레드가 생성된다. 따라서 시간이 더 걸릴 수 있다.
- 직접 스레드를 만들 필요 없이 스트림에
parallel()
메서드만 호출하면, 스트림이 자동으로 병렬 처리된다.
동작 원리parallel()
한 줄만 작성했는데 병렬로 처리된다. 병렬 스트림은 내부에서 공용 ForkJoinPool
을 사용하기 때문이다.
스트림에서 parallel()
를 선언하면 스트림은 공용 ForkJoinPool
을 사용하고, 내부적으로 병렬 처리 가능한 스레드 숫자와 작업의 크기 등을 확인하면서, Spliterator
를 통해 데이터를 자동으로 분할한다. 분할 방식은 데이터 소스의 특성에 따라 최적화되어 있다. 그리고 공용 풀을 통해 작업을 적절한 수준으로 분할(Fork), 처리(Execute)하고, 그 결과를 모은다(Join)
(구체적으로 어떻게 병렬로 처리할지는 자바에게 위임 선언형 프로그래밍 스타일이다.)
병렬 스트림 사용시 주의점1
병렬 스트림은 실무에서 쓸 일이 많지는 않고, 쓰더라도 굉장히 조심히 써야 한다.
병렬 스트림은 Fork/Join 공용 풀
을 사용한다. Fork/Join
공용 풀은 CPU 바운드 작업(계산 집약적인 작업)을 위해 설계되었다. 따라서 스레드가 주로 대기해야 하는 I/O 바운드 작업에는 적합하지 않다.
주의사항 - Fork/Join 프레임워크는 CPU 바운드 작업에만 사용해라!
- CPU 사용률이 높고 I/O 대기 시간이 적은 CPU 바운드 작업의 경우, 물리적인 CPU 코어와 비슷한 수의 스레드를 사용하는 것이 최적의 성능을 발휘할 수 있다.
- 스레드가 코어 수보다 많아지면 컨텍스트 스위칭 비용도 증가하고, 스레드 간 경쟁으로 성능이 저하될 수 있다.
만약에 I/O 작업을 ForkJoinPool에서 처리한다면?
- 스레드 블로킹에 따른 CPU 낭비
ForkJoinPool
은 CPU 코어 수에 맞춰 제한된 개수의 스레드를 사용한다. (특히 공용 풀)- I/O 작업으로 스레드가 블로킹되면 CPU가 놀게 되어, 전체 병렬 처리 효율이 크게 떨어진다.
- 컨텍스트 스위칭 오버헤드 증가
- I/O 작업 때문에 스레드를 늘리면, 실제 연산보다 대기 시간이 길어지는 상황이 발생할 수 있다.
- 스레드가 많아질수록 컨텍스트 스위칭(
context switching
) 비용도 증가하여 오히려 성능이 떨어질 수 있다.
- 작업 훔치기 기법 무력화
ForkJoinPool
이 제공하는 작업 훔치기 알고리즘은, CPU 바운드 작업에서 빠르게 작업 단위를 계속 처리하도록 설계되었다. (작업을 훔쳐서 쉬는 스레드 없이 계속 작업)- I/O 대기 시간이 많은 작업은 스래드가 I/O로 인해 대기하고 있는 경우가 많아, 작업 훔치기가 빛을 발휘하기 어렵고, 결과적으로 병렬 처리의 장점을 살리기 어렵다.
- 분할-정복(작업 분할) 이점 감소
Fork/Join
방식을 통해 작업을 잘게 나누어도, I/O 병목이 발생하면 CPU 병렬화 이점이 크게 줄어든다.- 오히려 분할된 작업들이 각기 I/O 대기를 반복하면서,
fork()
,join()
에 따른 오버헤드만 증가할 수 있다.
블로킹 작업이나 특수한 설정이 필요한 경우에는 커스텀 풀을 고려해야 한다.
- CPU 바운드 작업이라면
ForkJoinPool
을 통해 병렬 계산을 극대화할 수 있지만, I/O 바운드 작업은 별도의 전용 스레드 풀을 사용하는 편이 더 적합하다. 예)Executors.newFixedThreadPool()
등등
병렬 스트림 - 예제5
public static void main(String[] args) throws InterruptedException {
// 병렬 수준 3으로 제한
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "3");
// 요청 풀 추가
ExecutorService requestPool = Executors.newFixedThreadPool(100);
// 요청 스레드 수
int nThreads = 3; // 1, 2, 3, 10, 20
for (int i = 1; i <= nThreads; i++) {
String requestName = "request" + i;
requestPool.submit(() -> logic(requestName));
Thread.sleep(100); // 스레드 순서를 확인하기 위해 약간 대기
}
requestPool.close();
}
private static void logic(String requestName) {
long startTime = System.currentTimeMillis();
int sum = IntStream.rangeClosed(1, 4)
.parallel() // 주석 처리가 더 빨리 처리될 수 있음
.map(i -> HeavyJob.heavyTask(i, requestName))
.reduce(0, (a, b) -> a + b);
long endTime = System.currentTimeMillis();
log("[" + requestName + "] time: " + (endTime - startTime) + "ms, sum: " + sum);
}
- 3개의 요청 스레드가 각각 (1~4)에 대한 작업을 수행한다.
- 첫 번째 요청은 1초가 걸리지만, 두 번째 요청은 약 2초, 세 번째 요청은 약 3초 정도 걸린다. 왜 지연이 발생할까?
- 공용 풀의 제한된 병렬성
- 공용 풀은 병렬 수준(
parallelism
)이 3으로 설정되어 있어, 최대 3개의 작업만 동시에 처리할 수 있다. 여기에 요청 스레드도 자신의 작업에 참여하므로 각 작업당 총 4개의 스레드만 사용된다. - 따라서 총 12개의 요청(각각 4개의 작업)을 처리하는데 필요한 스레드 자원이 부족하다.
- 공용 풀은 병렬 수준(
- 처리 시간의 불균형
- 첫 번째 요청은 거의 모든 공용 풀 워커를 사용할 수 있었지만, 이후 요청들은 제한된 공용 풀 자원을 두고 경쟁해야 한다. 따라서 완료 시간이 점점 느려진다.
- 스레드 작업 분배: 일부 작업은 요청 스레드에서, 일부는 공용 풀에서 처리된다. 요청 스레드가 도와줘도 공용 풀의 스레드가 부족하여 한계가 있다.
요청이 증가할 수록 문제는 더 심해진다. nThreads
를 키워주면, 응답 시간이 확연하게 늘어난다.
이렇듯 공용 풀 병목 현상, 자원 경쟁, 예측 불가능한 성능와 같은 문제점으로 인해, WAS에서 수 많은 요청에 공용 풀을 사용하는 건 위험하다. 실무에서 공용 풀은 절대로 I/O 바운드 작업을 하면 안된다. 스레드가 모두 I/O 작업을 대기하면, Application에서 공용 풀을 사용하는 다른 모든 요청이 밀린다. 공용 풀은 반드시 CPU 바운드 작업에만 사용해야 한다. 애초에 설계도 이 목적이다.
(이런 경우 오히려 parallel()
을 제거하는 게 더 빠를 수 있다.)
I/O 바운드 작업이 많을 때 병렬 처리는 스레드를 직접 다루거나, ExecutorService 등을 통해 별도의 스레드 풀을 사용해야 한다.
병렬 스트림 사용시 주의점2:
별도의 스레드 풀로 개선해보자
// logic 처리 전용 스레드 풀
ExecutorService logicPool = Executors.newFixedThreadPool(400);
private static void logic(String requestName, ExecutorService es) { // 로직 풀 받음
log("[" + requestName + "] START");
long startTime = System.currentTimeMillis();
// 1부터 4까지의 작업을 각각 스레드 풀에 제출
Future<Integer> f1 = es.submit(() -> HeavyJob.heavyTask(1, requestName));
Future<Integer> f2 = es.submit(() -> HeavyJob.heavyTask(2, requestName));
Future<Integer> f3 = es.submit(() -> HeavyJob.heavyTask(3, requestName));
Future<Integer> f4 = es.submit(() -> HeavyJob.heavyTask(4, requestName));
// Future 결과 취합
int sum;
try {
Integer v1 = f1.get();
Integer v2 = f2.get();
Integer v3 = f3.get();
Integer v4 = f4.get();
sum = v1 + v2 + v3 + v4;
} catch (Exception e) {
throw new RuntimeException(e);
}
long endTime = System.currentTimeMillis();
log("[" + requestName + "] time: " + (endTime - startTime) + "ms, sum: " + sum);
}
- 별도의 스레드 풀을 생성하여 병렬 작업 처리에 사용
- 병렬 스트림을 사용하지 않고 직접 전용 스레드 풀에 작업을 제출한다. 각 작업은
Future
로 관리된다. - 모든 요청이 약 1초 내로 처리되고, 처리 시간이 일관된다. 요청 지연 문제 해결!
- 확장성 향상: 400개의 스레드를 가진 풀을 사용함으로써, 동시에 여러 요청을 효율적으로 처리한다.
스트림으로 코드를 개선하면
private static void logic(String requestName, ExecutorService es) {
log("[" + requestName + "] START");
long startTime = System.currentTimeMillis();
// 1부터 4까지의 작업을 각각 스레드 풀에 제출하고 Future 리스트 생성
List<Future<Integer>> futures = IntStream.rangeClosed(1, 4)
.mapToObj(i -> es.submit(() -> HeavyJob.heavyTask(i, requestName)))
.toList();
// Future 결과 취합 (Stream API 사용)
int sum = futures.stream()
.mapToInt(f -> {
try {
return f.get(); // Future 결과 가져오기 (블로킹)
} catch (Exception e) {
throw new RuntimeException(e);
}
}).sum();
long endTime = System.currentTimeMillis();
log("[" + requestName + "] time: " + (endTime - startTime) + "ms, sum: " + sum);
}
CompletableFuture와 주의 사항
특히 실무에서 자주 하는 실수가 병렬 스트림을 I/O 대기 작업에 사용하거나, 또는 CompletableFuture
를 사용할 때 발생한다
- 실무에서 복잡한 멀티스레드 코드를 작성할 때는
CompletableFuture
가 도움이 된다. - CompletableFuture는 스레드 요청 처리를 스트림처럼 메서드 체인, 람다를 이용해서 편하게 쓸 수 있게 해준다.
CompletableFuture
를 생성할 때는 별도의 스레드 풀을 반드시 지정해야 한다. 그렇지 않으면 Fork/Join 공용 풀이 대신 사용된다. 이 때문에 많은 장애가 발생한다.CompletableFuture
를 사용할 때는 반드시! 커스텀 풀을 지정해서 사용하자!
public static void main(String[] args) {
// 기본: ForkJoinPool.commonPool() 사용
CompletableFuture.runAsync(() -> log("Fork/Join")); // Fork/Join 공용 풀
// ExecutorService 직접 지정: 전용 스레드 풀 사용
ExecutorService es = Executors.newFixedThreadPool(100);
CompletableFuture.runAsync(() -> log("Custom Pool"), es); // 별도의 풀
es.close(); // ExecutorService 종료
}
'Java > Modern Java(8~)' 카테고리의 다른 글
[Java] 67. 디폴트 메서드 (0) | 2025.07.04 |
---|---|
[Java] 66. Optional (1) | 2025.07.04 |
[Java] 65. 스트림 API3 - 컬렉터 (0) | 2025.07.04 |
[Java] 64. 스트림 API2 - 기능 (0) | 2025.07.04 |
[Java] 63. 스트림 API1 - 기본 (0) | 2025.07.04 |