[Java] 68. 병렬 스트림

2025. 7. 4. 23:58·Java/Modern Java(8~)

 

 

병렬 스트림

#Java/adv3


단일 스트림

HeavyJob 클래스는 오래 걸리는 작업을 시뮬레이션하는데, 각 작업은 1초 정도 소요된다고 가정하겠다. 입력값에 10을 곱한 결과를 반환하며, 작업이 실행될 때마다 로그를 출력한다.


예제1 - 단일 스트림

단일 스트림(sequential stream)으로 IntStream.rangeClosed(1, 8)에서 나온 1부터 8까지의 숫자 각각에 대해 heavyTask()를 수행

long startTime = System.currentTimeMillis();

int sum = IntStream.rangeClosed(1, 8)
        .map(HeavyJob::heavyTask)
        .reduce(0, (a, b) -> a + b); // sum()

long endTime = System.currentTimeMillis();
log("time: " + (endTime - startTime) + "ms, sum: " + sum);
  • 총 8초가 소요된다(1초씩 * 8)

스레드 직접 사용

스레드를 사용하여 작업을 빨리 처리해보자. 작업을 분할한 뒤 여러 스레드로 작업을 동시에 처리한다.

// 1. Fork 작업을 분할한다.
SumTask task1 = new SumTask(1, 4);
SumTask task2 = new SumTask(5, 8);
Thread thread1 = new Thread(task1, "thread-1");
Thread thread2 = new Thread(task2, "thread-2");

// 2. 분할한 작업을 처리한다.
thread1.start();
thread2.start();

// 3. join 처리한 결과를 합친다.
thread1.join();
thread2.join();
log("main 스레드 대기 완료");

int sum = task1.result + task2.result;
  • SumTask는 Runnable 을 구현했고, 내부에서 1초씩 걸리는 heavyTask() 를 루프 돌면서 합산한다.
    • 고급 1편에서 구현했던 클래스다.
  • thread1.start(), thread2.start()로 각 스레드가 동시에 작업을 시작하고, thread1.join(), thread2.join()으로 두 스레드가 끝날 때까지 main 스레드가 대기한다. 이후 최총 합계를 구한다.
  • 작업을 두 스레드가 분할하여 처리했기 때문에 8초 걸리던 작업을 4초로 줄였다.
  • 하지만 이렇게 스레드를 직접 사용하면 스레드 수가 늘어나면 코드가 복잡해지고, 예외 처리, 스레드 풀 관리 등 추가 관리 포인트가 생기는 문제가 있다.

스레드 풀 사용:

ExecutorService를 사용해서 편리하게 병렬 처리를 하도록 개선

// 스레드 풀을 준비한다.
ExecutorService es = Executors.newFixedThreadPool(2);

// 1. 작업을 분할한다.
SumTask task1 = new SumTask(1, 4);
SumTask task2 = new SumTask(5, 8);

// 2. 분할한 작업을 처리한다.
Future<Integer> future1 = es.submit(task1);
Future<Integer> future2 = es.submit(task2);

// 3. join 처리한 결과를 합친다. get: 결과가 나올 때 까지 대기한다.
Integer result1 = future1.get();
Integer result2 = future2.get();

int sum = result1 + result2;
es.close();
  • 이전 예제와 마찬가지로 2개의 스레드가 병렬로 계산을 처리하므로 약 4초가 소요된다.
  • 하지만 여전히 코드 레벨에서 분할/병합 로직을 직접 짜야 하고, 스레드 풀 생성과 관리도 개발자가 직접해야 한다.

Fork/Join 패턴

분할(Fork), 처리(Execute), 모음(Join)

  • 하나의 큰 작업을 여러 스레드가 처리할 수 있는 작은 단위의 작업으로 분할(Fork)한 뒤, 이렇게 분할한 작업을 각각의 스레드가 처리(Execute)한다.
  • 이렇게 분할(Fork) → 처리(Execute) → 모음(Join)의 단계로 이루어진 멀티스레딩 패턴을 Fork/Join 패턴이라고 부른다.
    • 병렬 프로그래밍에서 매우 효율적인 방식으로, 복잡한 작업을 병렬적으로 처리할 수 있게 해준다.
  1. 작업 분할: 큰 작업을 여러 작은 작업으로 쪼개어(Fork) 각각의 스레드나 작업 단위로 할당하는 것을 포크(Fork)라 한다.
    • 포크에서 여러 갈래로 나뉘는 것이 작업을 분할하는 거랑 비슷해서 Fork라고 한다나 뭐라나
  2. 처리(Execute): 스레드가 분할된 각각의 작업을 처리
  3. Join 모음, 결과 합치기: 분할된 작업들이 모두 끝나면, 각 스레드 혹은 작업 단위별 결과를 하나로 합쳐야한다. Join은 이렇게 갈라진 작업들이 모두 끝난 뒤, 다시 합류하여 하나로 결과를 모으는 모습을 의미한다.

자바는 Fork/Join 프레임워크를 제공해서 개발자가 Fork/Join 패턴을 더 쉽게 구현할 수 있도록 지원한다.


Fork/Join 프레임워크1 - 소개

자바의 Fork/Join 프레임워크는 자바 7부터 도입된 java.util.concurrent 패키지의 일부로, 멀티코어 프로세서를 효율적으로 활용하기 위한 병렬처리 프레임워크다.


분할 정복(Divide and Conquer) 전략

  • 큰 작업(task)을 작은 단위로 재귀적으로 분할(fork)
  • 각 작은 작업의 결과를 합쳐(join) 최종 결과를 생성
  • 멀티코어 환경에서 작업을 효율적으로 분산 처리

작업 훔치기(Work Stealing) 알고리즘

  • 각 스레드는 자신의 작업 큐를 가짐
  • 작업이 없는 스레드는 다른 바쁜 스레드의 큐에서 작업을 "훔쳐와서" 대신 처리
  • 부하 균형을 자동으로 조절하여 효율성 향상

주요 클래스

  • ForkJoinPool
    • Fork/Join 작업을 실행하는 특수한 ExecutorService 스레드 풀
    • 작업 스케줄링 및 스레드 관리를 담당
    • 디폴트로 사용 가능한 프로세서 수 만큼 스레드 생성(CPU 코어 수만큼 생성)
    • 분할 정복과 작업 훔치기에 특화된 스레드 풀
  • ForkJoinTask
    • Fork/Join 작업의 기본 추상 클래스
    • Future를 구현했다.
    • 주로 두 하위 클래스를 구현해서 사용한다.
      • RecursiveTask<V> : 결과를 반환하는 작업
      • RecursiveAction: 결과를 반환하지 않는 작업(void)
      • 구현 방법:
        • compute() 메서드를 재정의해서 필요한 작업 로직을 작성한다.
        • 일반적으로 일정 기준(임계값)을 두고, 작업 범위가 작으면 직접 처리하고, 크면 작업을 둘로 분할하여 각각 병렬로 처리하도록 구현한다.

fork() / join() 메서드

  • fork(): 현재 스레드에서 다른 스레드로 작업을 분할하여 보내는 동작(비동기 실행)
  • join(): 분할된 작업이 끝날 때까지 기다린 후 결과를 가져오는 동작

우리가 직접 Fork/Join 프레임워크를 실무에서 직접 다룰 일은 드물다.


Fork/Join 프레임워크 활용

위 예시를 Fork/Join 프레임워크로 구현


핵심은 작업의 크기가 임계값 보다 크면 분할하고, 임계값 보다 같거나 작으면 직접 처리하는 것이다.
작업 크기가 8, 임계값이 4라고 하면

  1. Fork: 작업의 크기가 8이면 임계값을 넘었다. 따라서 작업을 절반으로 분할한다.
  2. Execute: 다음으로 작업의 크기가 4라면 임계값의 범위 안에 들어오므로 작업을 분할하지 않고, 처리한다.
  3. Join: 최종 결과를 합친다.

SumTask는 RecursiveTask<Integer> 를 상속받아 리스트의 합을 계산하는 작업을 병렬로 처리하는 클래스이다.
이 클래스는 Fork/Join 프레임워크의 분할 정복 전략을 구현한다. RecursiveTask.compute()메서드를 재정의해야 하자.

public class SumTask extends RecursiveTask<Integer> {
    private static final int THRESHOLD = 4; // 임계값
    private final List<Integer> list;

    public SumTask(List<Integer> list) {
        this.list = list;
    }

    @Override
    protected Integer compute() {
        // 작업 범위가 작으면 직접 합산
        if (list.size() <= THRESHOLD) {
            log("[처리 시작] " + list);
            int sum = list.stream()
                    .mapToInt(HeavyJob::heavyTask)
                    .sum();
            log("[처리 완료] " + list + " -> sum: " + sum);
            return sum;
        } else {
            // 작업 범위가 크면 반으로 나누어 병렬 처리
            int mid = list.size() / 2;
            List<Integer> leftList = list.subList(0, mid);
            List<Integer> rightList = list.subList(mid, list.size());
            log("[분할] " + list + " -> LEFT" + leftList + ", RIGHT" + rightList);

            SumTask leftTask = new SumTask(leftList);
            SumTask rightTask = new SumTask(rightList);

            // 왼쪽 작업은 다른 스레드에서 처리
            leftTask.fork();
            // 오른쪽 작업은 현재 스레드에서 처리
            int rightResult = rightTask.compute();

            // 왼쪽 작업 결과를 기다림
            int leftResult = leftTask.join();

            // 왼쪽과 오른쪽 작업 결과를 합침
            int joinSum = leftResult + rightResult;
            log("LEFT[" + leftResult + "] + RIGHT[" + rightResult + "] -> sum: " + joinSum);
            return joinSum;
        }
    }
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
    List<Integer> data = IntStream.rangeClosed(1, 8)
            .boxed()
            .toList();

    // ForkJoinPool 생성 및 작업 수행
    ForkJoinPool pool = new ForkJoinPool(10); // 최대 10개 스레드 사용

    long startTime = System.currentTimeMillis();
    SumTask task = new SumTask(data);

    // 병렬로 합을 구한 후 결과 출력
    int result = pool.invoke(task); // 작업 시작 및 완료 대기
    pool.close(); // 풀 종료

    long endTime = System.currentTimeMillis();
    log("time: " + (endTime - startTime) + "ms, sum: " + result);
    log("pool: " + pool);
}
  1. 데이터 생성: IntStream.rangeClosed(1, 8)를 사용해 1부터 8까지의 숫자 리스트를 생성한다.
  2. ForkJoinPool 생성:
    • new ForkJoinPool(10) 으로 최대 10개의 스레드를 사용할 수 있는 풀을 생성한다.
    • 참고로 기본 생성자(new ForkJoinPool())를 사용하면 시스템의 프로세서 수에 맞춰 스레드가 생성된다.
  3. invoke(): 메인 스레드가 pool.invoke(task)를 호출하면 SumTask 를 스레드 풀에 전달한다. SumTask는 ForkJoinPool 에 있는 별도의 스레드에서 실행된다. 메인 스레드는 작업이 완료될 때까지 기다린 후 결과를 받는다.
  4. pool.close(): 더 이상 작업이 없으므로 풀을 종료한다.
  5. 결과 출력: 계산된 리스트의 합과 실행 시간을 출력한다.

실행 결과

  • 4초 소요
  • [main] pool: java.util.concurrent.ForkJoinPool@57baeedf[Terminated, parallelism = 10, size = 0, active = 0, running = 0, steals = 2, tasks = 0, submissions = 0]


정리

  • Fork/Join 프레임워크를 사용하면 RecursiveTask 를 통해 작업을 재귀적으로 분할하는 것을 확인할 수 있다. 여기서는 작업을 단순히 2개로만 분할해서 스레드도 동시에 2개만 사용할 수 있었다.
  • THRESHOLD (임계값)을 더 줄여서 작업을 더 잘게 분할하면 더 많은 스레드를 활용할 수 있다. 물론 이 경우 풀의 스레드 수도 2개보다 더 많아야 효과가 있다.
  • 실제로는 작업 훔치기 과정이 포함되어 있다. 아래서 알아보자.

Fork/Join 프레임워크2 - 작업 훔치기

임계값을 2로 설정하면, 2초의 시간이 소요된다.


작업 훔치기 알고리즘

지금까지 설명을 단순화 하기 위해 작업 훔치기(Work-Stealing) 알고리즘은 설명하지 않았다. 작업 훔치기가 뭔지 알아보자.

  • Fork/Join 풀의 스레드는 각자 자신의 작업 큐를 가진다.
    • 덕분에 작업을 큐에서 가져가기 위한 스레드간 경합이 줄어든다.
  • 그리고 자신의 작업이 없는 경우, 그래서 스레드가 할 일이 없는 경우에 다른 스레드의 작업 큐에 대기중인 작업을 훔쳐서 대신 처리한다.

과정

  • ForkJoinPool 에 작업을 요청하면 ForkJoinPool 내부에 있는 외부 작업 큐에 작업이 저장된다.
  • 포크 조인 풀의 스레드는 만약 자신이 할 일이 없고, 자신의 작업 큐에도 작업이 없는 경우 다른 작업 큐에 있는 작업을 훔쳐서 대신 처리할 수 있다.
    • w1 스레드는 자신이 처리할 일이 없으므로 다른 작업 큐의 작업을 훔친다. 여기서는 외부 작업 큐에 들어 있는 작업을 훔쳐서 대신 처리한다.
  • w1 은 훔친 작업의 compute()를 호출하면서 작업을 시작한다.
  • w1 은 작업의 크기가 크다고 평가하고 작업을 둘로 분할한다.
    • [1,2,3,4]의 작업은 fork를 호출해서 비동기로 다른 스레드가 실행해주길 기대한다.
    • 사실 fork()는 다른 스레드에 작업을 요청하는게 아니라, 스레드 자신의 작업 큐에 작업을 넣어두는 것이다.
    • 이후에 자신이 여유가 되면 스스로 보관한 작업을 처리하고, 자신이 여유가 없고 쉬는 스레드가 있다면 쉬는 스레드가 작업을 훔쳐가서 대신 처리한다.
  • w1은 [5,6,7,8] 작업을 분할한다.
    • [5,6]은 Fork를 통해서 자신의 작업 큐에 보관한다.
    • [7, 8]은 compute를 호출해서 스스로 처리한다.
  • w1 의 작업 큐에 작업이 2개나 대기 중이다. 쉬고 있는 w2 가 w1 의 작업 [1,2,3,4]를 훔친다.
    • 참고로 여기에 있는 큐는 데크에 가깝다. 따라서 양쪽으로 넣고 뺄 수 있는 구조이다. 경합이 덜 발생한다.
  • w2 는 작업의 크기가 크다고 평가하고 작업을 둘로 분할한다.
    • [1,2]의 작업은 fork를 호출해서 자신의 작업 큐에 넣어둔다.
    • [3,4]의 작업은 compute를 호출해서 스스로 처리한다. (재귀 호출)
  • 작업 큐에 남아있는 작업들을 w3, w4 스레드가 훔쳐가서 실행한다.
    • w3: w1 의 작업 큐 [5,6]을 훔쳐서 처리
    • w4: w2의 작업 큐 [1,2]를 훔쳐서 처리

작업 훔치기 알고리즘
이 예제에서는 작업량이 균등하게 분배되었지만, 실제 상황에서 작업량이 불균형할 경우 작업 훔치기 알고리즘이 동작하여 유휴 스레드가 다른 바쁜 스레드의 작업을 가져와 처리함으로써 전체 효율성을 높일 수 있다.


Fork/Join 적절한 작업 크기 선택

적절한 임계값 설정은 병렬 처리의 효율성에 큰 영향을 미치므로, 작업의 특성과 시스템 환경에 맞게 조정하는 것이 중요하다.


너무 작은 단위로 작업을 분할하면 스레드 생성과 관리에 드는 오버헤드가 커질 수 있으며, 너무 큰 단위로 분할하면 병렬 처리의 이점을 충분히 활용하지 못할 수 있다.


예) 1 ~ 1000까지 처리해야 하는 작업, 스레드는 10개

  • 1개 단위로 쪼개는 경우: 1000개의 분할과 결합이 필요. 한 스레드당 100개의 작업 처리
    • 스레드가 작업을 찾고 관리하는 부분도 늘어나고, 분할과 결과를 합하는 과정의 오버헤드도 너무 크다.
  • 10개 단위로 쪼개는 경우: 100개의 분할과 결합이 필요. 한 스레드당 10개의 작업 처리
  • 100개 단위로 쪼개는 경우: 10개의 분할과 결합이 필요. 한 스레드당 1개의 작업 처리
  • 500개 단위로 쪼개는 경우: 2개의 분할과 결합이 필요. 스레드 최대 2개 사용 가능

작업시간이 완전히 동일하게 처리된다고 가정하면 이상적으로는 한 스레드당 1개의 작업을 처리하는 것이 좋다. 왜냐하면 스레드를 100% 사용하면서 분할과 결합의 오버헤드도 최소화 할 수 있기 때문이다.
하지만 작업 시간이 다른 경우를 고려한다면 한 스레드당 1개의 작업 보다는 더 잘게 쪼개어 두는 것이 좋다.

  • ForkJoinPool 은 스레드의 작업이 끝나면 다른 스레드가 처리하지 못하고 대기하는 작업을 훔쳐서 처리하는 기능을 제공하기 때문이다.

그리고 실질적으로는 작업 시간이 완전히 균등하지 않은 경우가 많다. 이런 상황에서 최적의 임계값 선택을 위해 고려해야 할 요소들은 다음과 같다.

  • 작업의 복잡성: 작업이 단순하면 분할 오버헤드가 더 크게 작용한다. 작업이 복잡할수록 더 작은 단위로 나누는 것이 유리할 수 있다. 예를 들어 1 + 2 + 3 + 4의 아주 단순한 연산을 1 + 2, 3 + 4로 분할하게 되면 분할하고 합하는 비용이 더 든다.
  • 작업의 균일성: 작업 처리 시간이 불균일할수록 작업 훔치기(work stealing)가 효과적으로 작동하도록 적절히 작은 크기로 분할하는 것이 중요하다.
  • 시스템 하드웨어: 코어 수, 캐시 크기, 메모리 대역폭 등 하드웨어 특성에 따라 최적의 작업 크기가 달라진다.
  • 스레드 전환 비용: 너무 작은 작업은 스레드 관리 오버헤드가 증가할 수 있다.

적절한 작업의 크기에 대한 정답은 없지만, CPU 바운드 작업이라고 가정할 때, CPU 코어수에 맞추어 스레드를 생성하고. 작업 수는 스레드 수에 4~ 10배 정도로 생성하자. 물론 작업의 성격에 따라 다르다. 그리고 성능 테스트를 통해 적절한 값으로 조절하면 된다.


Fork/Join 프레임워크3 - 공용 풀

자바 8에서는 공용 풀(Common Pool) 개념이 도입되었는데, Fork/Join 작업을 위한 자바가 제공하는 기본 스레드 풀이다.

ForkJoinPool commonPool = ForkJoinPool.commonPool();
  • 시스템 전체에서 공유: 애플리케이션 내에서 단일 인스턴스로 공유되어 사용된다.
  • 자동 생성: 별도로 생성하지 않아도 ForkJoinPool.commonPool() 을 통해 접근할 수 있다.
  • 편리한 사용: 별도의 풀을 만들지 않고도 RecursiveTask / RecursiveAction 을 사용할 때 기본적으로 이 공용 풀이 사용된다.
  • 병렬 스트림 활용: 자바 8의 병렬 스트림은 내부적으로 이 공용 풀을 사용한다. (뒤에서 설명한다.)
  • 자원 효율성: 여러 곳에서 별도의 풀을 생성하는 대신 공용 풀을 사용함으로써 시스템 자원을 효율적으로 관리할 수 있다.
  • 병렬 수준 자동 설정: 기본적으로 시스템의 가용 프로세서 수에서 1을 뺀 값으로 병렬 수준(parallelism)이 설정된다.
    • CPU 코어 8개 7개의 스레드가 사용됨

개발자는 Fork/Join 공용 풀을 이용하여 Fork/Join 풀을 편리하게 사용할 수 있다.


public class ForkJoinMain2 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        int processorCount = Runtime.getRuntime().availableProcessors();
        ForkJoinPool commonPool = ForkJoinPool.commonPool();

        List<Integer> data = IntStream.rangeClosed(1, 8)
                .boxed()
                .toList();

        SumTask task = new SumTask(data); // RecursiveTask 사용
        // task.invoke() 호출 -> 내부적으로 common pool 사용
        Integer result = task.invoke(); // 공용 풀 사용
        log("최종 결과: " + result);
    }
    // SumTask는 이전 예제와 동일 (THRESHOLD=2)
    static class SumTask extends RecursiveTask<Integer> {} // 생략
}

  • 이전 예제에서는 직접 ForkJoinPool을 생성하고 pool.invoke(task)를 통해 풀에 직접 작업을 요청했다.
  • 이번 예제에서는 task.invoke()를 통해 작업(RecursiveTask)에 있는 invoke()를 직접 호출했다.
  • Integer result = task.invoke(); 여기서 사용한 invoke() 메서드는 현재 스레드(메인)에서 작업을 시작하여 compute()를 호출하고, fork()로 작업을 분할하여 공용 풀에 있는 워커 스레드들이 분할된 작업을 수행한다.
    • 처음 compute()는 메인스레드가 호출하고, compute 내의 fork를 호출하면 공용 풀의 스레드로 작업이 분할된다.
    • leftTask는 공용 풀 스레드, rightTask는 메인 스레드가 실행한다. 메인 스레드도 작업을 도와주는거다.
      • 메인 스레드는 최종 결과가 나올 때 까지 블로킹되야 한다. 그냥 대기하는 것 보단 작업을 도와주는게 효율적이다.
    • invoke() : 호출 스레드가 작업을 도우면서 대기(블로킹)한다. 작업의 결과를 반환 받는다.
    • fork(): 작업을 비동기로 호출하려면 invoke() 대신에 fork()를 호출하면 된다. Future (ForkJoinTask)를 반환 받는다.

정리

  • 공용 풀은 JVM이 종료될 때까지 계속 유지되므로, 별도로 풀을 종료(shutdown())하지 않아도 된다.
  • 이렇게 공용 풀을 활용하면, 별도로 풀을 생성/관리하는 코드를 작성하지 않아도 간편하게 병렬 처리를 구현할 수 있다.

공용 풀 vs 커스텀 풀

  1. 자원 관리: 커스텀 풀은 명시적으로 생성하고 관리해야 하지만, 공용 풀은 시스템에서 자동으로 관리된다.
  2. 재사용성: 공용 풀은 여러 곳에서 공유할 수 있어 자원을 효율적으로 사용할 수 있다.
  3. 설정 제어: 커스텀 풀은 병렬 수준(스레드의 숫자), 스레드 팩토리 등을 세부적으로 제어할 수 있지만, 공용 풀은 기본 설정을 사용한다.
  4. 라이프사이클: 커스텀 풀은 명시적으로 종료해야 하지만, 공용 풀은 JVM이 관리한다. 따라서 종료하지 않아도 된다.

참고로 공용 풀 설정은 시스템 속성으로 변경할 수 있지만 권장하지 않는다고 한다.


공용 풀이 CPU - 1만큼 스레드를 생성하는 이유

  • 메인 스레드의 참여: 메인 스레드도 작업을 도와주므로, (코어 개수 - 1)개의 워커 스레드 + 1개의 메인스레드로 CPU 코어 활용
  • 다른 프로세스와의 자원 경쟁 고려: OS, GC, 다른 Application 같은 내부 작업들도 처리해야 하는데, 모든 코어를 최대치로 활용하면 다른 작업들이 지연되거나 컨텍스트 스위칭 비용이 증가할 수 있다. 코어 여유분 1개를 남겨둬서 시스템 성능을 안정적으로 유지하려는 목적이 있다.
  • 효율적인 자원 활용: 특정 상황(다른 작업 스레드나 OS 레벨 작업)에서도 병목을 일으키지 않는 선에서 효율적으로 CPU를 활용

자바 병렬 스트림: 병렬 스트림(parallel())

병렬 스트림은 Fork/Join 공용 풀을 사용해서 병렬 연산을 수행한다.


스트림에 parallel() 추가

long startTime = System.currentTimeMillis();

int sum = IntStream.rangeClosed(1, 8)
        .parallel() // 추가
        .map(HeavyJob::heavyTask)
        .reduce(0, (a, b) -> a + b);

long endTime = System.currentTimeMillis();
log("time: " + (endTime - startTime) + "ms, sum: " + sum);
14:56:32.773 [   main] processorCount = 14, commonPool = 13
14:56:32.777 [   main] calculate 6 -> 60
14:56:32.777 [ForkJoinPool.commonPool-worker-2] calculate 2 -> 20
14:56:32.777 [ForkJoinPool.commonPool-worker-3] calculate 4 -> 40
14:56:32.777 [ForkJoinPool.commonPool-worker-1] calculate 3 -> 30
14:56:32.777 [ForkJoinPool.commonPool-worker-5] calculate 7 -> 70
14:56:32.778 [ForkJoinPool.commonPool-worker-6] calculate 1 -> 10
14:56:32.778 [ForkJoinPool.commonPool-worker-4] calculate 8 -> 80
14:56:32.778 [ForkJoinPool.commonPool-worker-7] calculate 5 -> 50
14:56:33.783 [   main] time: 1006ms, sum: 360
  • 로그를 보면 ForkJoinPool.commonPool-worker-N 스레드들이 동시에 일을 처리하고 있다.
  • 예제1에서 8초 이상 걸렸던 작업이, 이 예제에서는 모두 병렬로 실행되어 시간이 약 1초로 크게 줄어든다.
    • 만약 CPU 코어가 4개라면 공용 풀에는 3개의 스레드가 생성된다. 따라서 시간이 더 걸릴 수 있다.
  • 직접 스레드를 만들 필요 없이 스트림에 parallel() 메서드만 호출하면, 스트림이 자동으로 병렬 처리된다.

동작 원리
parallel() 한 줄만 작성했는데 병렬로 처리된다. 병렬 스트림은 내부에서 공용 ForkJoinPool을 사용하기 때문이다.
스트림에서 parallel()를 선언하면 스트림은 공용 ForkJoinPool 을 사용하고, 내부적으로 병렬 처리 가능한 스레드 숫자와 작업의 크기 등을 확인하면서, Spliterator 를 통해 데이터를 자동으로 분할한다. 분할 방식은 데이터 소스의 특성에 따라 최적화되어 있다. 그리고 공용 풀을 통해 작업을 적절한 수준으로 분할(Fork), 처리(Execute)하고, 그 결과를 모은다(Join)
(구체적으로 어떻게 병렬로 처리할지는 자바에게 위임 선언형 프로그래밍 스타일이다.)


병렬 스트림 사용시 주의점1

병렬 스트림은 실무에서 쓸 일이 많지는 않고, 쓰더라도 굉장히 조심히 써야 한다.
병렬 스트림은 Fork/Join 공용 풀을 사용한다. Fork/Join 공용 풀은 CPU 바운드 작업(계산 집약적인 작업)을 위해 설계되었다. 따라서 스레드가 주로 대기해야 하는 I/O 바운드 작업에는 적합하지 않다.


주의사항 - Fork/Join 프레임워크는 CPU 바운드 작업에만 사용해라!

  • CPU 사용률이 높고 I/O 대기 시간이 적은 CPU 바운드 작업의 경우, 물리적인 CPU 코어와 비슷한 수의 스레드를 사용하는 것이 최적의 성능을 발휘할 수 있다.
  • 스레드가 코어 수보다 많아지면 컨텍스트 스위칭 비용도 증가하고, 스레드 간 경쟁으로 성능이 저하될 수 있다.

만약에 I/O 작업을 ForkJoinPool에서 처리한다면?

  1. 스레드 블로킹에 따른 CPU 낭비
    • ForkJoinPool 은 CPU 코어 수에 맞춰 제한된 개수의 스레드를 사용한다. (특히 공용 풀)
    • I/O 작업으로 스레드가 블로킹되면 CPU가 놀게 되어, 전체 병렬 처리 효율이 크게 떨어진다.
  2. 컨텍스트 스위칭 오버헤드 증가
    • I/O 작업 때문에 스레드를 늘리면, 실제 연산보다 대기 시간이 길어지는 상황이 발생할 수 있다.
    • 스레드가 많아질수록 컨텍스트 스위칭(context switching) 비용도 증가하여 오히려 성능이 떨어질 수 있다.
  3. 작업 훔치기 기법 무력화
    • ForkJoinPool 이 제공하는 작업 훔치기 알고리즘은, CPU 바운드 작업에서 빠르게 작업 단위를 계속 처리하도록 설계되었다. (작업을 훔쳐서 쉬는 스레드 없이 계속 작업)
    • I/O 대기 시간이 많은 작업은 스래드가 I/O로 인해 대기하고 있는 경우가 많아, 작업 훔치기가 빛을 발휘하기 어렵고, 결과적으로 병렬 처리의 장점을 살리기 어렵다.
  4. 분할-정복(작업 분할) 이점 감소
    • Fork/Join 방식을 통해 작업을 잘게 나누어도, I/O 병목이 발생하면 CPU 병렬화 이점이 크게 줄어든다.
    • 오히려 분할된 작업들이 각기 I/O 대기를 반복하면서, fork(), join()에 따른 오버헤드만 증가할 수 있다.

블로킹 작업이나 특수한 설정이 필요한 경우에는 커스텀 풀을 고려해야 한다.

  • CPU 바운드 작업이라면 ForkJoinPool을 통해 병렬 계산을 극대화할 수 있지만, I/O 바운드 작업은 별도의 전용 스레드 풀을 사용하는 편이 더 적합하다. 예) Executors.newFixedThreadPool() 등등

병렬 스트림 - 예제5

public static void main(String[] args) throws InterruptedException {
    // 병렬 수준 3으로 제한
    System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "3");

    // 요청 풀 추가
    ExecutorService requestPool = Executors.newFixedThreadPool(100);

	// 요청 스레드 수
    int nThreads = 3; // 1, 2, 3, 10, 20
    for (int i = 1; i <= nThreads; i++) {
        String requestName = "request" + i;
        requestPool.submit(() -> logic(requestName));
        Thread.sleep(100); // 스레드 순서를 확인하기 위해 약간 대기
    }
    requestPool.close();
}

private static void logic(String requestName) {
    long startTime = System.currentTimeMillis();

    int sum = IntStream.rangeClosed(1, 4)
            .parallel() // 주석 처리가 더 빨리 처리될 수 있음
            .map(i -> HeavyJob.heavyTask(i, requestName))
            .reduce(0, (a, b) -> a + b);

    long endTime = System.currentTimeMillis();
    log("[" + requestName + "] time: " + (endTime - startTime) + "ms, sum: " + sum);
}
  • 3개의 요청 스레드가 각각 (1~4)에 대한 작업을 수행한다.
  • 첫 번째 요청은 1초가 걸리지만, 두 번째 요청은 약 2초, 세 번째 요청은 약 3초 정도 걸린다. 왜 지연이 발생할까?
  • 공용 풀의 제한된 병렬성
    • 공용 풀은 병렬 수준(parallelism)이 3으로 설정되어 있어, 최대 3개의 작업만 동시에 처리할 수 있다. 여기에 요청 스레드도 자신의 작업에 참여하므로 각 작업당 총 4개의 스레드만 사용된다.
    • 따라서 총 12개의 요청(각각 4개의 작업)을 처리하는데 필요한 스레드 자원이 부족하다.
  • 처리 시간의 불균형
    • 첫 번째 요청은 거의 모든 공용 풀 워커를 사용할 수 있었지만, 이후 요청들은 제한된 공용 풀 자원을 두고 경쟁해야 한다. 따라서 완료 시간이 점점 느려진다.
  • 스레드 작업 분배: 일부 작업은 요청 스레드에서, 일부는 공용 풀에서 처리된다. 요청 스레드가 도와줘도 공용 풀의 스레드가 부족하여 한계가 있다.

요청이 증가할 수록 문제는 더 심해진다. nThreads를 키워주면, 응답 시간이 확연하게 늘어난다.
이렇듯 공용 풀 병목 현상, 자원 경쟁, 예측 불가능한 성능와 같은 문제점으로 인해, WAS에서 수 많은 요청에 공용 풀을 사용하는 건 위험하다. 실무에서 공용 풀은 절대로 I/O 바운드 작업을 하면 안된다. 스레드가 모두 I/O 작업을 대기하면, Application에서 공용 풀을 사용하는 다른 모든 요청이 밀린다. 공용 풀은 반드시 CPU 바운드 작업에만 사용해야 한다. 애초에 설계도 이 목적이다.


(이런 경우 오히려 parallel()을 제거하는 게 더 빠를 수 있다.)
I/O 바운드 작업이 많을 때 병렬 처리는 스레드를 직접 다루거나, ExecutorService 등을 통해 별도의 스레드 풀을 사용해야 한다.


병렬 스트림 사용시 주의점2:

별도의 스레드 풀로 개선해보자

// logic 처리 전용 스레드 풀
ExecutorService logicPool = Executors.newFixedThreadPool(400);

private static void logic(String requestName, ExecutorService es) { // 로직 풀 받음
    log("[" + requestName + "] START");
    long startTime = System.currentTimeMillis();

    // 1부터 4까지의 작업을 각각 스레드 풀에 제출
    Future<Integer> f1 = es.submit(() -> HeavyJob.heavyTask(1, requestName));
    Future<Integer> f2 = es.submit(() -> HeavyJob.heavyTask(2, requestName));
    Future<Integer> f3 = es.submit(() -> HeavyJob.heavyTask(3, requestName));
    Future<Integer> f4 = es.submit(() -> HeavyJob.heavyTask(4, requestName));

    // Future 결과 취합
    int sum;
    try {
        Integer v1 = f1.get();
        Integer v2 = f2.get();
        Integer v3 = f3.get();
        Integer v4 = f4.get();
        sum = v1 + v2 + v3 + v4;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }

    long endTime = System.currentTimeMillis();
    log("[" + requestName + "] time: " + (endTime - startTime) + "ms, sum: " + sum);
}
  • 별도의 스레드 풀을 생성하여 병렬 작업 처리에 사용
  • 병렬 스트림을 사용하지 않고 직접 전용 스레드 풀에 작업을 제출한다. 각 작업은 Future로 관리된다.
  • 모든 요청이 약 1초 내로 처리되고, 처리 시간이 일관된다. 요청 지연 문제 해결!
  • 확장성 향상: 400개의 스레드를 가진 풀을 사용함으로써, 동시에 여러 요청을 효율적으로 처리한다.

스트림으로 코드를 개선하면

private static void logic(String requestName, ExecutorService es) {
    log("[" + requestName + "] START");
    long startTime = System.currentTimeMillis();

    // 1부터 4까지의 작업을 각각 스레드 풀에 제출하고 Future 리스트 생성
    List<Future<Integer>> futures = IntStream.rangeClosed(1, 4)
            .mapToObj(i -> es.submit(() -> HeavyJob.heavyTask(i, requestName)))
            .toList();

    // Future 결과 취합 (Stream API 사용)
    int sum = futures.stream()
            .mapToInt(f -> {
                try {
                    return f.get(); // Future 결과 가져오기 (블로킹)
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }).sum();

    long endTime = System.currentTimeMillis();
    log("[" + requestName + "] time: " + (endTime - startTime) + "ms, sum: " + sum);
}

CompletableFuture와 주의 사항

특히 실무에서 자주 하는 실수가 병렬 스트림을 I/O 대기 작업에 사용하거나, 또는 CompletableFuture를 사용할 때 발생한다

  • 실무에서 복잡한 멀티스레드 코드를 작성할 때는 CompletableFuture 가 도움이 된다.
  • CompletableFuture는 스레드 요청 처리를 스트림처럼 메서드 체인, 람다를 이용해서 편하게 쓸 수 있게 해준다.
  • CompletableFuture를 생성할 때는 별도의 스레드 풀을 반드시 지정해야 한다. 그렇지 않으면 Fork/Join 공용 풀이 대신 사용된다. 이 때문에 많은 장애가 발생한다. CompletableFuture를 사용할 때는 반드시! 커스텀 풀을 지정해서 사용하자!
public static void main(String[] args) {
    // 기본: ForkJoinPool.commonPool() 사용
    CompletableFuture.runAsync(() -> log("Fork/Join")); // Fork/Join 공용 풀

    // ExecutorService 직접 지정: 전용 스레드 풀 사용
    ExecutorService es = Executors.newFixedThreadPool(100);
    CompletableFuture.runAsync(() -> log("Custom Pool"), es); // 별도의 풀
    es.close(); // ExecutorService 종료
}

Ref) 김영한의 실전 자바 - 고급 3편, 람다, 스트림, 함수형 프로그래밍 강의 | 김영한 - 인프런

'Java > Modern Java(8~)' 카테고리의 다른 글

[Java] 67. 디폴트 메서드  (0) 2025.07.04
[Java] 66. Optional  (1) 2025.07.04
[Java] 65. 스트림 API3 - 컬렉터  (0) 2025.07.04
[Java] 64. 스트림 API2 - 기능  (0) 2025.07.04
[Java] 63. 스트림 API1 - 기본  (0) 2025.07.04
'Java/Modern Java(8~)' 카테고리의 다른 글
  • [Java] 67. 디폴트 메서드
  • [Java] 66. Optional
  • [Java] 65. 스트림 API3 - 컬렉터
  • [Java] 64. 스트림 API2 - 기능
lumana
lumana
배움을 나누는 공간 https://github.com/bebeis
  • lumana
    Brute force Study
    lumana
  • 전체
    오늘
    어제
    • 분류 전체보기 (457)
      • Software Development (27)
        • Performance (0)
        • TroubleShooting (1)
        • Refactoring (0)
        • Test (8)
        • Code Style, Convetion (0)
        • DDD (0)
        • Software Engineering (18)
      • Java (71)
        • Basic (5)
        • Core (21)
        • Collection (7)
        • 멀티스레드&동시성 (13)
        • IO, Network (8)
        • Reflection, Annotation (3)
        • Modern Java(8~) (12)
        • JVM (2)
      • Spring (53)
        • Framework (12)
        • MVC (23)
        • Transaction (3)
        • AOP (11)
        • Boot (0)
        • AI (0)
      • DB Access (1)
        • Jdbc (1)
        • JdbcTemplate (0)
        • JPA (14)
        • Spring Data JPA (0)
        • QueryDSL (0)
      • Computer Science (130)
        • Data Structure (27)
        • OS (14)
        • Database (10)
        • Network (21)
        • 컴퓨터구조 (6)
        • 시스템 프로그래밍 (23)
        • Algorithm (29)
      • HTTP (8)
      • Infra (1)
        • Docker (1)
      • 프로그래밍언어론 (15)
      • Programming Language(Sub) (77)
        • Kotlin (1)
        • Python (25)
        • C++ (51)
        • JavaScript (0)
      • FE (11)
        • HTML (1)
        • CSS (9)
        • React (0)
        • Application (1)
      • Unix_Linux (0)
        • Common (0)
      • PS (13)
        • BOJ (7)
        • Tip (3)
        • 프로그래머스 (0)
        • CodeForce (0)
      • Book Review (4)
        • Clean Code (4)
      • Math (3)
        • Linear Algebra (3)
      • AI (7)
        • DL (0)
        • ML (0)
        • DA (0)
        • Concepts (7)
      • 프리코스 (4)
      • Project Review (6)
      • LegacyPosts (11)
      • 모니터 (0)
      • Diary (0)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

  • 인기 글

  • 태그

  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.4
lumana
[Java] 68. 병렬 스트림
상단으로

티스토리툴바