병렬 스트림
#Java/adv3
단일 스트림
HeavyJob 클래스는 오래 걸리는 작업을 시뮬레이션하는데, 각 작업은 1초 정도 소요된다고 가정하겠다. 입력값에 10을 곱한 결과를 반환하며, 작업이 실행될 때마다 로그를 출력한다.
예제1 - 단일 스트림
단일 스트림(sequential stream)으로 IntStream.rangeClosed(1, 8)에서 나온 1부터 8까지의 숫자 각각에 대해 heavyTask()를 수행
long startTime = System.currentTimeMillis();
int sum = IntStream.rangeClosed(1, 8)
.map(HeavyJob::heavyTask)
.reduce(0, (a, b) -> a + b); // sum()
long endTime = System.currentTimeMillis();
log("time: " + (endTime - startTime) + "ms, sum: " + sum);
- 총 8초가 소요된다(1초씩 * 8)
스레드 직접 사용
스레드를 사용하여 작업을 빨리 처리해보자. 작업을 분할한 뒤 여러 스레드로 작업을 동시에 처리한다.
// 1. Fork 작업을 분할한다.
SumTask task1 = new SumTask(1, 4);
SumTask task2 = new SumTask(5, 8);
Thread thread1 = new Thread(task1, "thread-1");
Thread thread2 = new Thread(task2, "thread-2");
// 2. 분할한 작업을 처리한다.
thread1.start();
thread2.start();
// 3. join 처리한 결과를 합친다.
thread1.join();
thread2.join();
log("main 스레드 대기 완료");
int sum = task1.result + task2.result;
SumTask는Runnable을 구현했고, 내부에서 1초씩 걸리는heavyTask()를 루프 돌면서 합산한다.- 고급 1편에서 구현했던 클래스다.
thread1.start(),thread2.start()로 각 스레드가 동시에 작업을 시작하고,thread1.join(),thread2.join()으로 두 스레드가 끝날 때까지main스레드가 대기한다. 이후 최총 합계를 구한다.- 작업을 두 스레드가 분할하여 처리했기 때문에 8초 걸리던 작업을 4초로 줄였다.
- 하지만 이렇게 스레드를 직접 사용하면 스레드 수가 늘어나면 코드가 복잡해지고, 예외 처리, 스레드 풀 관리 등 추가 관리 포인트가 생기는 문제가 있다.
스레드 풀 사용:
ExecutorService를 사용해서 편리하게 병렬 처리를 하도록 개선
// 스레드 풀을 준비한다.
ExecutorService es = Executors.newFixedThreadPool(2);
// 1. 작업을 분할한다.
SumTask task1 = new SumTask(1, 4);
SumTask task2 = new SumTask(5, 8);
// 2. 분할한 작업을 처리한다.
Future<Integer> future1 = es.submit(task1);
Future<Integer> future2 = es.submit(task2);
// 3. join 처리한 결과를 합친다. get: 결과가 나올 때 까지 대기한다.
Integer result1 = future1.get();
Integer result2 = future2.get();
int sum = result1 + result2;
es.close();
- 이전 예제와 마찬가지로 2개의 스레드가 병렬로 계산을 처리하므로 약 4초가 소요된다.
- 하지만 여전히 코드 레벨에서 분할/병합 로직을 직접 짜야 하고, 스레드 풀 생성과 관리도 개발자가 직접해야 한다.
Fork/Join 패턴
분할(Fork), 처리(Execute), 모음(Join)
- 하나의 큰 작업을 여러 스레드가 처리할 수 있는 작은 단위의 작업으로 분할(Fork)한 뒤, 이렇게 분할한 작업을 각각의 스레드가 처리(Execute)한다.
- 이렇게 분할(Fork) → 처리(Execute) → 모음(Join)의 단계로 이루어진 멀티스레딩 패턴을 Fork/Join 패턴이라고 부른다.
- 병렬 프로그래밍에서 매우 효율적인 방식으로, 복잡한 작업을 병렬적으로 처리할 수 있게 해준다.
- 작업 분할: 큰 작업을 여러 작은 작업으로 쪼개어(Fork) 각각의 스레드나 작업 단위로 할당하는 것을 포크(
Fork)라 한다.- 포크에서 여러 갈래로 나뉘는 것이 작업을 분할하는 거랑 비슷해서 Fork라고 한다나 뭐라나
- 처리(Execute): 스레드가 분할된 각각의 작업을 처리
- Join 모음, 결과 합치기: 분할된 작업들이 모두 끝나면, 각 스레드 혹은 작업 단위별 결과를 하나로 합쳐야한다.
Join은 이렇게 갈라진 작업들이 모두 끝난 뒤, 다시 합류하여 하나로 결과를 모으는 모습을 의미한다.
자바는 Fork/Join 프레임워크를 제공해서 개발자가 Fork/Join 패턴을 더 쉽게 구현할 수 있도록 지원한다.
Fork/Join 프레임워크1 - 소개
자바의 Fork/Join 프레임워크는 자바 7부터 도입된 java.util.concurrent 패키지의 일부로, 멀티코어 프로세서를 효율적으로 활용하기 위한 병렬처리 프레임워크다.
분할 정복(Divide and Conquer) 전략
- 큰 작업(
task)을 작은 단위로 재귀적으로 분할(fork) - 각 작은 작업의 결과를 합쳐(
join) 최종 결과를 생성 - 멀티코어 환경에서 작업을 효율적으로 분산 처리
작업 훔치기(Work Stealing) 알고리즘
- 각 스레드는 자신의 작업 큐를 가짐
- 작업이 없는 스레드는 다른 바쁜 스레드의 큐에서 작업을 "훔쳐와서" 대신 처리
- 부하 균형을 자동으로 조절하여 효율성 향상
주요 클래스
ForkJoinPool- Fork/Join 작업을 실행하는 특수한
ExecutorService스레드 풀 - 작업 스케줄링 및 스레드 관리를 담당
- 디폴트로 사용 가능한 프로세서 수 만큼 스레드 생성(CPU 코어 수만큼 생성)
- 분할 정복과 작업 훔치기에 특화된 스레드 풀
- Fork/Join 작업을 실행하는 특수한
ForkJoinTask- Fork/Join 작업의 기본 추상 클래스
Future를 구현했다.- 주로 두 하위 클래스를 구현해서 사용한다.
RecursiveTask<V>: 결과를 반환하는 작업RecursiveAction: 결과를 반환하지 않는 작업(void)- 구현 방법:
compute()메서드를 재정의해서 필요한 작업 로직을 작성한다.- 일반적으로 일정 기준(임계값)을 두고, 작업 범위가 작으면 직접 처리하고, 크면 작업을 둘로 분할하여 각각 병렬로 처리하도록 구현한다.
fork() / join() 메서드
fork(): 현재 스레드에서 다른 스레드로 작업을 분할하여 보내는 동작(비동기 실행)join(): 분할된 작업이 끝날 때까지 기다린 후 결과를 가져오는 동작
우리가 직접 Fork/Join 프레임워크를 실무에서 직접 다룰 일은 드물다.
Fork/Join 프레임워크 활용
위 예시를 Fork/Join 프레임워크로 구현
핵심은 작업의 크기가 임계값 보다 크면 분할하고, 임계값 보다 같거나 작으면 직접 처리하는 것이다.
작업 크기가 8, 임계값이 4라고 하면
- Fork: 작업의 크기가 8이면 임계값을 넘었다. 따라서 작업을 절반으로 분할한다.
- Execute: 다음으로 작업의 크기가 4라면 임계값의 범위 안에 들어오므로 작업을 분할하지 않고, 처리한다.
- Join: 최종 결과를 합친다.
SumTask는 RecursiveTask<Integer> 를 상속받아 리스트의 합을 계산하는 작업을 병렬로 처리하는 클래스이다.
이 클래스는 Fork/Join 프레임워크의 분할 정복 전략을 구현한다. RecursiveTask.compute()메서드를 재정의해야 하자.
public class SumTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 4; // 임계값
private final List<Integer> list;
public SumTask(List<Integer> list) {
this.list = list;
}
@Override
protected Integer compute() {
// 작업 범위가 작으면 직접 합산
if (list.size() <= THRESHOLD) {
log("[처리 시작] " + list);
int sum = list.stream()
.mapToInt(HeavyJob::heavyTask)
.sum();
log("[처리 완료] " + list + " -> sum: " + sum);
return sum;
} else {
// 작업 범위가 크면 반으로 나누어 병렬 처리
int mid = list.size() / 2;
List<Integer> leftList = list.subList(0, mid);
List<Integer> rightList = list.subList(mid, list.size());
log("[분할] " + list + " -> LEFT" + leftList + ", RIGHT" + rightList);
SumTask leftTask = new SumTask(leftList);
SumTask rightTask = new SumTask(rightList);
// 왼쪽 작업은 다른 스레드에서 처리
leftTask.fork();
// 오른쪽 작업은 현재 스레드에서 처리
int rightResult = rightTask.compute();
// 왼쪽 작업 결과를 기다림
int leftResult = leftTask.join();
// 왼쪽과 오른쪽 작업 결과를 합침
int joinSum = leftResult + rightResult;
log("LEFT[" + leftResult + "] + RIGHT[" + rightResult + "] -> sum: " + joinSum);
return joinSum;
}
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
List<Integer> data = IntStream.rangeClosed(1, 8)
.boxed()
.toList();
// ForkJoinPool 생성 및 작업 수행
ForkJoinPool pool = new ForkJoinPool(10); // 최대 10개 스레드 사용
long startTime = System.currentTimeMillis();
SumTask task = new SumTask(data);
// 병렬로 합을 구한 후 결과 출력
int result = pool.invoke(task); // 작업 시작 및 완료 대기
pool.close(); // 풀 종료
long endTime = System.currentTimeMillis();
log("time: " + (endTime - startTime) + "ms, sum: " + result);
log("pool: " + pool);
}
- 데이터 생성:
IntStream.rangeClosed(1, 8)를 사용해 1부터 8까지의 숫자 리스트를 생성한다. - ForkJoinPool 생성:
new ForkJoinPool(10)으로 최대 10개의 스레드를 사용할 수 있는 풀을 생성한다.- 참고로 기본 생성자(
new ForkJoinPool())를 사용하면 시스템의 프로세서 수에 맞춰 스레드가 생성된다.
- invoke(): 메인 스레드가
pool.invoke(task)를 호출하면SumTask를 스레드 풀에 전달한다.SumTask는ForkJoinPool에 있는 별도의 스레드에서 실행된다. 메인 스레드는 작업이 완료될 때까지 기다린 후 결과를 받는다. - pool.close(): 더 이상 작업이 없으므로 풀을 종료한다.
- 결과 출력: 계산된 리스트의 합과 실행 시간을 출력한다.
실행 결과
- 4초 소요
[main] pool: java.util.concurrent.ForkJoinPool@57baeedf[Terminated, parallelism = 10, size = 0, active = 0, running = 0, steals = 2, tasks = 0, submissions = 0]
정리
Fork/Join 프레임워크를 사용하면RecursiveTask를 통해 작업을 재귀적으로 분할하는 것을 확인할 수 있다. 여기서는 작업을 단순히 2개로만 분할해서 스레드도 동시에 2개만 사용할 수 있었다.THRESHOLD(임계값)을 더 줄여서 작업을 더 잘게 분할하면 더 많은 스레드를 활용할 수 있다. 물론 이 경우 풀의 스레드 수도 2개보다 더 많아야 효과가 있다.- 실제로는 작업 훔치기 과정이 포함되어 있다. 아래서 알아보자.
Fork/Join 프레임워크2 - 작업 훔치기
임계값을 2로 설정하면, 2초의 시간이 소요된다.
작업 훔치기 알고리즘
지금까지 설명을 단순화 하기 위해 작업 훔치기(Work-Stealing) 알고리즘은 설명하지 않았다. 작업 훔치기가 뭔지 알아보자.
- Fork/Join 풀의 스레드는 각자 자신의 작업 큐를 가진다.
- 덕분에 작업을 큐에서 가져가기 위한 스레드간 경합이 줄어든다.
- 그리고 자신의 작업이 없는 경우, 그래서 스레드가 할 일이 없는 경우에 다른 스레드의 작업 큐에 대기중인 작업을 훔쳐서 대신 처리한다.
과정
ForkJoinPool에 작업을 요청하면ForkJoinPool내부에 있는 외부 작업 큐에 작업이 저장된다.- 포크 조인 풀의 스레드는 만약 자신이 할 일이 없고, 자신의 작업 큐에도 작업이 없는 경우 다른 작업 큐에 있는 작업을 훔쳐서 대신 처리할 수 있다.
w1스레드는 자신이 처리할 일이 없으므로 다른 작업 큐의 작업을 훔친다. 여기서는 외부 작업 큐에 들어 있는 작업을 훔쳐서 대신 처리한다.
w1은 훔친 작업의compute()를 호출하면서 작업을 시작한다.w1은 작업의 크기가 크다고 평가하고 작업을 둘로 분할한다.[1,2,3,4]의 작업은fork를 호출해서 비동기로 다른 스레드가 실행해주길 기대한다.- 사실
fork()는 다른 스레드에 작업을 요청하는게 아니라, 스레드 자신의 작업 큐에 작업을 넣어두는 것이다. - 이후에 자신이 여유가 되면 스스로 보관한 작업을 처리하고, 자신이 여유가 없고 쉬는 스레드가 있다면 쉬는 스레드가 작업을 훔쳐가서 대신 처리한다.
w1은[5,6,7,8]작업을 분할한다.[5,6]은 Fork를 통해서 자신의 작업 큐에 보관한다.[7, 8]은compute를 호출해서 스스로 처리한다.
w1의 작업 큐에 작업이 2개나 대기 중이다. 쉬고 있는w2가w1의 작업[1,2,3,4]를 훔친다.- 참고로 여기에 있는 큐는 데크에 가깝다. 따라서 양쪽으로 넣고 뺄 수 있는 구조이다. 경합이 덜 발생한다.
w2는 작업의 크기가 크다고 평가하고 작업을 둘로 분할한다.[1,2]의 작업은fork를 호출해서 자신의 작업 큐에 넣어둔다.[3,4]의 작업은compute를 호출해서 스스로 처리한다. (재귀 호출)
- 작업 큐에 남아있는 작업들을
w3,w4스레드가 훔쳐가서 실행한다.w3:w1의 작업 큐[5,6]을 훔쳐서 처리w4:w2의 작업 큐[1,2]를 훔쳐서 처리
작업 훔치기 알고리즘
이 예제에서는 작업량이 균등하게 분배되었지만, 실제 상황에서 작업량이 불균형할 경우 작업 훔치기 알고리즘이 동작하여 유휴 스레드가 다른 바쁜 스레드의 작업을 가져와 처리함으로써 전체 효율성을 높일 수 있다.
Fork/Join 적절한 작업 크기 선택
적절한 임계값 설정은 병렬 처리의 효율성에 큰 영향을 미치므로, 작업의 특성과 시스템 환경에 맞게 조정하는 것이 중요하다.
너무 작은 단위로 작업을 분할하면 스레드 생성과 관리에 드는 오버헤드가 커질 수 있으며, 너무 큰 단위로 분할하면 병렬 처리의 이점을 충분히 활용하지 못할 수 있다.
예) 1 ~ 1000까지 처리해야 하는 작업, 스레드는 10개
- 1개 단위로 쪼개는 경우: 1000개의 분할과 결합이 필요. 한 스레드당 100개의 작업 처리
- 스레드가 작업을 찾고 관리하는 부분도 늘어나고, 분할과 결과를 합하는 과정의 오버헤드도 너무 크다.
- 10개 단위로 쪼개는 경우: 100개의 분할과 결합이 필요. 한 스레드당 10개의 작업 처리
- 100개 단위로 쪼개는 경우: 10개의 분할과 결합이 필요. 한 스레드당 1개의 작업 처리
- 500개 단위로 쪼개는 경우: 2개의 분할과 결합이 필요. 스레드 최대 2개 사용 가능
작업시간이 완전히 동일하게 처리된다고 가정하면 이상적으로는 한 스레드당 1개의 작업을 처리하는 것이 좋다. 왜냐하면 스레드를 100% 사용하면서 분할과 결합의 오버헤드도 최소화 할 수 있기 때문이다.
하지만 작업 시간이 다른 경우를 고려한다면 한 스레드당 1개의 작업 보다는 더 잘게 쪼개어 두는 것이 좋다.
ForkJoinPool은 스레드의 작업이 끝나면 다른 스레드가 처리하지 못하고 대기하는 작업을 훔쳐서 처리하는 기능을 제공하기 때문이다.
그리고 실질적으로는 작업 시간이 완전히 균등하지 않은 경우가 많다. 이런 상황에서 최적의 임계값 선택을 위해 고려해야 할 요소들은 다음과 같다.
- 작업의 복잡성: 작업이 단순하면 분할 오버헤드가 더 크게 작용한다. 작업이 복잡할수록 더 작은 단위로 나누는 것이 유리할 수 있다. 예를 들어
1 + 2 + 3 + 4의 아주 단순한 연산을1 + 2,3 + 4로 분할하게 되면 분할하고 합하는 비용이 더 든다. - 작업의 균일성: 작업 처리 시간이 불균일할수록 작업 훔치기(
work stealing)가 효과적으로 작동하도록 적절히 작은 크기로 분할하는 것이 중요하다. - 시스템 하드웨어: 코어 수, 캐시 크기, 메모리 대역폭 등 하드웨어 특성에 따라 최적의 작업 크기가 달라진다.
- 스레드 전환 비용: 너무 작은 작업은 스레드 관리 오버헤드가 증가할 수 있다.
적절한 작업의 크기에 대한 정답은 없지만, CPU 바운드 작업이라고 가정할 때, CPU 코어수에 맞추어 스레드를 생성하고. 작업 수는 스레드 수에 4~ 10배 정도로 생성하자. 물론 작업의 성격에 따라 다르다. 그리고 성능 테스트를 통해 적절한 값으로 조절하면 된다.
Fork/Join 프레임워크3 - 공용 풀
자바 8에서는 공용 풀(Common Pool) 개념이 도입되었는데, Fork/Join 작업을 위한 자바가 제공하는 기본 스레드 풀이다.
ForkJoinPool commonPool = ForkJoinPool.commonPool();
- 시스템 전체에서 공유: 애플리케이션 내에서 단일 인스턴스로 공유되어 사용된다.
- 자동 생성: 별도로 생성하지 않아도
ForkJoinPool.commonPool()을 통해 접근할 수 있다. - 편리한 사용: 별도의 풀을 만들지 않고도
RecursiveTask/RecursiveAction을 사용할 때 기본적으로 이 공용 풀이 사용된다. - 병렬 스트림 활용: 자바 8의 병렬 스트림은 내부적으로 이 공용 풀을 사용한다. (뒤에서 설명한다.)
- 자원 효율성: 여러 곳에서 별도의 풀을 생성하는 대신 공용 풀을 사용함으로써 시스템 자원을 효율적으로 관리할 수 있다.
- 병렬 수준 자동 설정: 기본적으로 시스템의 가용 프로세서 수에서 1을 뺀 값으로 병렬 수준(
parallelism)이 설정된다.- CPU 코어 8개 7개의 스레드가 사용됨
개발자는 Fork/Join 공용 풀을 이용하여 Fork/Join 풀을 편리하게 사용할 수 있다.
public class ForkJoinMain2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
int processorCount = Runtime.getRuntime().availableProcessors();
ForkJoinPool commonPool = ForkJoinPool.commonPool();
List<Integer> data = IntStream.rangeClosed(1, 8)
.boxed()
.toList();
SumTask task = new SumTask(data); // RecursiveTask 사용
// task.invoke() 호출 -> 내부적으로 common pool 사용
Integer result = task.invoke(); // 공용 풀 사용
log("최종 결과: " + result);
}
// SumTask는 이전 예제와 동일 (THRESHOLD=2)
static class SumTask extends RecursiveTask<Integer> {} // 생략
}
- 이전 예제에서는 직접 ForkJoinPool을 생성하고
pool.invoke(task)를 통해 풀에 직접 작업을 요청했다. - 이번 예제에서는
task.invoke()를 통해 작업(RecursiveTask)에 있는invoke()를 직접 호출했다. Integer result = task.invoke();여기서 사용한invoke()메서드는 현재 스레드(메인)에서 작업을 시작하여compute()를 호출하고,fork()로 작업을 분할하여 공용 풀에 있는 워커 스레드들이 분할된 작업을 수행한다.- 처음 compute()는 메인스레드가 호출하고, compute 내의 fork를 호출하면 공용 풀의 스레드로 작업이 분할된다.
- leftTask는 공용 풀 스레드, rightTask는 메인 스레드가 실행한다. 메인 스레드도 작업을 도와주는거다.
- 메인 스레드는 최종 결과가 나올 때 까지 블로킹되야 한다. 그냥 대기하는 것 보단 작업을 도와주는게 효율적이다.
invoke(): 호출 스레드가 작업을 도우면서 대기(블로킹)한다. 작업의 결과를 반환 받는다.fork(): 작업을 비동기로 호출하려면invoke()대신에fork()를 호출하면 된다.Future(ForkJoinTask)를 반환 받는다.
정리
- 공용 풀은 JVM이 종료될 때까지 계속 유지되므로, 별도로 풀을 종료(
shutdown())하지 않아도 된다. - 이렇게 공용 풀을 활용하면, 별도로 풀을 생성/관리하는 코드를 작성하지 않아도 간편하게 병렬 처리를 구현할 수 있다.
공용 풀 vs 커스텀 풀
- 자원 관리: 커스텀 풀은 명시적으로 생성하고 관리해야 하지만, 공용 풀은 시스템에서 자동으로 관리된다.
- 재사용성: 공용 풀은 여러 곳에서 공유할 수 있어 자원을 효율적으로 사용할 수 있다.
- 설정 제어: 커스텀 풀은 병렬 수준(스레드의 숫자), 스레드 팩토리 등을 세부적으로 제어할 수 있지만, 공용 풀은 기본 설정을 사용한다.
- 라이프사이클: 커스텀 풀은 명시적으로 종료해야 하지만, 공용 풀은 JVM이 관리한다. 따라서 종료하지 않아도 된다.
참고로 공용 풀 설정은 시스템 속성으로 변경할 수 있지만 권장하지 않는다고 한다.
공용 풀이 CPU - 1만큼 스레드를 생성하는 이유
- 메인 스레드의 참여: 메인 스레드도 작업을 도와주므로, (코어 개수 - 1)개의 워커 스레드 + 1개의 메인스레드로 CPU 코어 활용
- 다른 프로세스와의 자원 경쟁 고려: OS, GC, 다른 Application 같은 내부 작업들도 처리해야 하는데, 모든 코어를 최대치로 활용하면 다른 작업들이 지연되거나 컨텍스트 스위칭 비용이 증가할 수 있다. 코어 여유분 1개를 남겨둬서 시스템 성능을 안정적으로 유지하려는 목적이 있다.
- 효율적인 자원 활용: 특정 상황(다른 작업 스레드나 OS 레벨 작업)에서도 병목을 일으키지 않는 선에서 효율적으로 CPU를 활용
자바 병렬 스트림: 병렬 스트림(parallel())
병렬 스트림은 Fork/Join 공용 풀을 사용해서 병렬 연산을 수행한다.
스트림에 parallel() 추가
long startTime = System.currentTimeMillis();
int sum = IntStream.rangeClosed(1, 8)
.parallel() // 추가
.map(HeavyJob::heavyTask)
.reduce(0, (a, b) -> a + b);
long endTime = System.currentTimeMillis();
log("time: " + (endTime - startTime) + "ms, sum: " + sum);
14:56:32.773 [ main] processorCount = 14, commonPool = 13
14:56:32.777 [ main] calculate 6 -> 60
14:56:32.777 [ForkJoinPool.commonPool-worker-2] calculate 2 -> 20
14:56:32.777 [ForkJoinPool.commonPool-worker-3] calculate 4 -> 40
14:56:32.777 [ForkJoinPool.commonPool-worker-1] calculate 3 -> 30
14:56:32.777 [ForkJoinPool.commonPool-worker-5] calculate 7 -> 70
14:56:32.778 [ForkJoinPool.commonPool-worker-6] calculate 1 -> 10
14:56:32.778 [ForkJoinPool.commonPool-worker-4] calculate 8 -> 80
14:56:32.778 [ForkJoinPool.commonPool-worker-7] calculate 5 -> 50
14:56:33.783 [ main] time: 1006ms, sum: 360
- 로그를 보면
ForkJoinPool.commonPool-worker-N스레드들이 동시에 일을 처리하고 있다. - 예제1에서 8초 이상 걸렸던 작업이, 이 예제에서는 모두 병렬로 실행되어 시간이 약 1초로 크게 줄어든다.
- 만약 CPU 코어가 4개라면 공용 풀에는 3개의 스레드가 생성된다. 따라서 시간이 더 걸릴 수 있다.
- 직접 스레드를 만들 필요 없이 스트림에
parallel()메서드만 호출하면, 스트림이 자동으로 병렬 처리된다.
동작 원리parallel() 한 줄만 작성했는데 병렬로 처리된다. 병렬 스트림은 내부에서 공용 ForkJoinPool을 사용하기 때문이다.
스트림에서 parallel()를 선언하면 스트림은 공용 ForkJoinPool 을 사용하고, 내부적으로 병렬 처리 가능한 스레드 숫자와 작업의 크기 등을 확인하면서, Spliterator 를 통해 데이터를 자동으로 분할한다. 분할 방식은 데이터 소스의 특성에 따라 최적화되어 있다. 그리고 공용 풀을 통해 작업을 적절한 수준으로 분할(Fork), 처리(Execute)하고, 그 결과를 모은다(Join)
(구체적으로 어떻게 병렬로 처리할지는 자바에게 위임 선언형 프로그래밍 스타일이다.)
병렬 스트림 사용시 주의점1
병렬 스트림은 실무에서 쓸 일이 많지는 않고, 쓰더라도 굉장히 조심히 써야 한다.
병렬 스트림은 Fork/Join 공용 풀을 사용한다. Fork/Join 공용 풀은 CPU 바운드 작업(계산 집약적인 작업)을 위해 설계되었다. 따라서 스레드가 주로 대기해야 하는 I/O 바운드 작업에는 적합하지 않다.
주의사항 - Fork/Join 프레임워크는 CPU 바운드 작업에만 사용해라!
- CPU 사용률이 높고 I/O 대기 시간이 적은 CPU 바운드 작업의 경우, 물리적인 CPU 코어와 비슷한 수의 스레드를 사용하는 것이 최적의 성능을 발휘할 수 있다.
- 스레드가 코어 수보다 많아지면 컨텍스트 스위칭 비용도 증가하고, 스레드 간 경쟁으로 성능이 저하될 수 있다.
만약에 I/O 작업을 ForkJoinPool에서 처리한다면?
- 스레드 블로킹에 따른 CPU 낭비
ForkJoinPool은 CPU 코어 수에 맞춰 제한된 개수의 스레드를 사용한다. (특히 공용 풀)- I/O 작업으로 스레드가 블로킹되면 CPU가 놀게 되어, 전체 병렬 처리 효율이 크게 떨어진다.
- 컨텍스트 스위칭 오버헤드 증가
- I/O 작업 때문에 스레드를 늘리면, 실제 연산보다 대기 시간이 길어지는 상황이 발생할 수 있다.
- 스레드가 많아질수록 컨텍스트 스위칭(
context switching) 비용도 증가하여 오히려 성능이 떨어질 수 있다.
- 작업 훔치기 기법 무력화
ForkJoinPool이 제공하는 작업 훔치기 알고리즘은, CPU 바운드 작업에서 빠르게 작업 단위를 계속 처리하도록 설계되었다. (작업을 훔쳐서 쉬는 스레드 없이 계속 작업)- I/O 대기 시간이 많은 작업은 스래드가 I/O로 인해 대기하고 있는 경우가 많아, 작업 훔치기가 빛을 발휘하기 어렵고, 결과적으로 병렬 처리의 장점을 살리기 어렵다.
- 분할-정복(작업 분할) 이점 감소
Fork/Join방식을 통해 작업을 잘게 나누어도, I/O 병목이 발생하면 CPU 병렬화 이점이 크게 줄어든다.- 오히려 분할된 작업들이 각기 I/O 대기를 반복하면서,
fork(),join()에 따른 오버헤드만 증가할 수 있다.
블로킹 작업이나 특수한 설정이 필요한 경우에는 커스텀 풀을 고려해야 한다.
- CPU 바운드 작업이라면
ForkJoinPool을 통해 병렬 계산을 극대화할 수 있지만, I/O 바운드 작업은 별도의 전용 스레드 풀을 사용하는 편이 더 적합하다. 예)Executors.newFixedThreadPool()등등
병렬 스트림 - 예제5
public static void main(String[] args) throws InterruptedException {
// 병렬 수준 3으로 제한
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "3");
// 요청 풀 추가
ExecutorService requestPool = Executors.newFixedThreadPool(100);
// 요청 스레드 수
int nThreads = 3; // 1, 2, 3, 10, 20
for (int i = 1; i <= nThreads; i++) {
String requestName = "request" + i;
requestPool.submit(() -> logic(requestName));
Thread.sleep(100); // 스레드 순서를 확인하기 위해 약간 대기
}
requestPool.close();
}
private static void logic(String requestName) {
long startTime = System.currentTimeMillis();
int sum = IntStream.rangeClosed(1, 4)
.parallel() // 주석 처리가 더 빨리 처리될 수 있음
.map(i -> HeavyJob.heavyTask(i, requestName))
.reduce(0, (a, b) -> a + b);
long endTime = System.currentTimeMillis();
log("[" + requestName + "] time: " + (endTime - startTime) + "ms, sum: " + sum);
}
- 3개의 요청 스레드가 각각 (1~4)에 대한 작업을 수행한다.
- 첫 번째 요청은 1초가 걸리지만, 두 번째 요청은 약 2초, 세 번째 요청은 약 3초 정도 걸린다. 왜 지연이 발생할까?
- 공용 풀의 제한된 병렬성
- 공용 풀은 병렬 수준(
parallelism)이 3으로 설정되어 있어, 최대 3개의 작업만 동시에 처리할 수 있다. 여기에 요청 스레드도 자신의 작업에 참여하므로 각 작업당 총 4개의 스레드만 사용된다. - 따라서 총 12개의 요청(각각 4개의 작업)을 처리하는데 필요한 스레드 자원이 부족하다.
- 공용 풀은 병렬 수준(
- 처리 시간의 불균형
- 첫 번째 요청은 거의 모든 공용 풀 워커를 사용할 수 있었지만, 이후 요청들은 제한된 공용 풀 자원을 두고 경쟁해야 한다. 따라서 완료 시간이 점점 느려진다.
- 스레드 작업 분배: 일부 작업은 요청 스레드에서, 일부는 공용 풀에서 처리된다. 요청 스레드가 도와줘도 공용 풀의 스레드가 부족하여 한계가 있다.
요청이 증가할 수록 문제는 더 심해진다. nThreads를 키워주면, 응답 시간이 확연하게 늘어난다.
이렇듯 공용 풀 병목 현상, 자원 경쟁, 예측 불가능한 성능와 같은 문제점으로 인해, WAS에서 수 많은 요청에 공용 풀을 사용하는 건 위험하다. 실무에서 공용 풀은 절대로 I/O 바운드 작업을 하면 안된다. 스레드가 모두 I/O 작업을 대기하면, Application에서 공용 풀을 사용하는 다른 모든 요청이 밀린다. 공용 풀은 반드시 CPU 바운드 작업에만 사용해야 한다. 애초에 설계도 이 목적이다.
(이런 경우 오히려 parallel()을 제거하는 게 더 빠를 수 있다.)
I/O 바운드 작업이 많을 때 병렬 처리는 스레드를 직접 다루거나, ExecutorService 등을 통해 별도의 스레드 풀을 사용해야 한다.
병렬 스트림 사용시 주의점2:
별도의 스레드 풀로 개선해보자
// logic 처리 전용 스레드 풀
ExecutorService logicPool = Executors.newFixedThreadPool(400);
private static void logic(String requestName, ExecutorService es) { // 로직 풀 받음
log("[" + requestName + "] START");
long startTime = System.currentTimeMillis();
// 1부터 4까지의 작업을 각각 스레드 풀에 제출
Future<Integer> f1 = es.submit(() -> HeavyJob.heavyTask(1, requestName));
Future<Integer> f2 = es.submit(() -> HeavyJob.heavyTask(2, requestName));
Future<Integer> f3 = es.submit(() -> HeavyJob.heavyTask(3, requestName));
Future<Integer> f4 = es.submit(() -> HeavyJob.heavyTask(4, requestName));
// Future 결과 취합
int sum;
try {
Integer v1 = f1.get();
Integer v2 = f2.get();
Integer v3 = f3.get();
Integer v4 = f4.get();
sum = v1 + v2 + v3 + v4;
} catch (Exception e) {
throw new RuntimeException(e);
}
long endTime = System.currentTimeMillis();
log("[" + requestName + "] time: " + (endTime - startTime) + "ms, sum: " + sum);
}
- 별도의 스레드 풀을 생성하여 병렬 작업 처리에 사용
- 병렬 스트림을 사용하지 않고 직접 전용 스레드 풀에 작업을 제출한다. 각 작업은
Future로 관리된다. - 모든 요청이 약 1초 내로 처리되고, 처리 시간이 일관된다. 요청 지연 문제 해결!
- 확장성 향상: 400개의 스레드를 가진 풀을 사용함으로써, 동시에 여러 요청을 효율적으로 처리한다.
스트림으로 코드를 개선하면
private static void logic(String requestName, ExecutorService es) {
log("[" + requestName + "] START");
long startTime = System.currentTimeMillis();
// 1부터 4까지의 작업을 각각 스레드 풀에 제출하고 Future 리스트 생성
List<Future<Integer>> futures = IntStream.rangeClosed(1, 4)
.mapToObj(i -> es.submit(() -> HeavyJob.heavyTask(i, requestName)))
.toList();
// Future 결과 취합 (Stream API 사용)
int sum = futures.stream()
.mapToInt(f -> {
try {
return f.get(); // Future 결과 가져오기 (블로킹)
} catch (Exception e) {
throw new RuntimeException(e);
}
}).sum();
long endTime = System.currentTimeMillis();
log("[" + requestName + "] time: " + (endTime - startTime) + "ms, sum: " + sum);
}
CompletableFuture와 주의 사항
특히 실무에서 자주 하는 실수가 병렬 스트림을 I/O 대기 작업에 사용하거나, 또는 CompletableFuture를 사용할 때 발생한다
- 실무에서 복잡한 멀티스레드 코드를 작성할 때는
CompletableFuture가 도움이 된다. - CompletableFuture는 스레드 요청 처리를 스트림처럼 메서드 체인, 람다를 이용해서 편하게 쓸 수 있게 해준다.
CompletableFuture를 생성할 때는 별도의 스레드 풀을 반드시 지정해야 한다. 그렇지 않으면 Fork/Join 공용 풀이 대신 사용된다. 이 때문에 많은 장애가 발생한다.CompletableFuture를 사용할 때는 반드시! 커스텀 풀을 지정해서 사용하자!
public static void main(String[] args) {
// 기본: ForkJoinPool.commonPool() 사용
CompletableFuture.runAsync(() -> log("Fork/Join")); // Fork/Join 공용 풀
// ExecutorService 직접 지정: 전용 스레드 풀 사용
ExecutorService es = Executors.newFixedThreadPool(100);
CompletableFuture.runAsync(() -> log("Custom Pool"), es); // 별도의 풀
es.close(); // ExecutorService 종료
}
'Java > Modern Java(8~)' 카테고리의 다른 글
| [Java 8+] Map과 merge(), compute() (0) | 2025.08.30 |
|---|---|
| [Java] 67. 디폴트 메서드 (0) | 2025.07.04 |
| [Java] 66. Optional (0) | 2025.07.04 |
| [Java] 65. 스트림 API3 - 컬렉터 (0) | 2025.07.04 |
| [Java] 64. 스트림 API2 - 기능 (0) | 2025.07.04 |