스레드 풀과 Executor 프레임워크(1)
#Java/멀티스레드
(복습할 때 왠만하면 강의 자료 전체를 보는게 좋긴 할듯)
이전 챕터까지 우리는 Runnable 을 만들고 Thread 를 직접 생성해서 사용했다. 하지만 실무에서 직접 스레드를 생성해서 사용하면 다음과 같은 문제가 있다.
- 스레드 생성 시간으로 인한 성능 문제
- 스레드 관리 문제
Runnable인터페이스의 불편함
1. 스레드 생성 비용으로 인한 성능 문제
스레드를 사용하려면 먼저 스레드를 생성해야 한다. 그런데 스레드는 다음과 같은 이유로 매우 무겁다.
- 메모리 할당: 각 스레드는 자신만의 호출 스택(call stack)을 가지고 있어야 한다. 이 호출 스택은 스레드가 실행되는 동안 사용하는 메모리 공간이다. 따라서 스레드를 생성할 때는 이 호출 스택을 위한 메모리를 할당해야 한다.
- 운영체제 자원 사용: 스레드를 생성하는 작업은 운영체제 커널 수준에서 이루어지며, 시스템 콜(system call)을 통해 처리된다. 이는 CPU와 메모리 리소스를 소모하는 작업이다.
- 운영체제 스케줄러 설정: 새로운 스레드가 생성되면 운영체제의 스케줄러는 이 스레드를 관리하고 실행 순서를 조정해야 한다. 이는 운영체제의 스케줄링 알고리즘에 따라 추가적인 오버헤드가 발생할 수 있다.
- 참고로 스레드 하나는 보통 1MB 이상의 메모리를 사용한다.
스레드를 생성하는 작업은 단순히 자바 객체를 하나 생성하는 것과는 비교할 수 없을 정도로 큰 작업이다. 요청 작업 하나 처리하려고 스레드 하나를 만들면 스레드 생성에 많은 시간이 소요된다.
Solution: 스레드를 만들어두고 재사용하자. 처음 생성된 이후로 스레드는 빠르게 작업을 처리할 수 있다.
2. 스레드 관리 문제
서버의 CPU, 메모리 자원은 한정되어 있기 때문에, 스레드는 무한하게 만들 수 없다.
평소 동시에 100개 정도의 스레드면 충분했는데, 갑자기 10000개의 스레드가 필요한 상황이 된다면 CPU, 메모리 자원이 버티지 못할 것이다.
Solution: 시스템이 버틸 수 있는, 최대 스레드의 수까지만 스레드를 생성할 수 있게 관리해야 한다.
또한, 업데이트 등의 이유로 Application을 종료해야 하는 상황에서, 이미 작업을 처리중인 스레드의 작업을 완료시키고 프로그램을 종료시킨다던지, 인터럽트를 걸어서 급하게 종료시킨다던지, 등등 여러 상황에서 스레드를 관리할 수 있어야 한다.
3. Runnable 인터페이스의 불편함
public interface Runnable {
void run();
}
Runnable 인터페이스는 다음과 같은 이유로 불편하다.
- 반환 값이 없다:
run()메서드는 반환 값을 가지지 않는다. 따라서 실행 결과를 얻기 위해서는 별도의 메커니즘을 사용해야 한다. - 예외 처리:
run()메서드는 체크 예외(checked exception)를 던질 수 없다. 체크 예외의 처리는 메서드 내부에서 처리해야 한다.
반환 값을 받을 수 있고, 예외를 쉽게 다루고, caller에서 스레드의 예외도 받을 수 있으면 좋을 것 같다.
해결
지금까지 설명한 1번, 2번 문제를 해결하려면 스레드를 생성하고 관리하는 풀(Pool)이 필요하다.
- 스레드를 관리하는 스레드 풀(스레드가 모여서 대기하는 수영장 풀 같은 개념)에 스레드를 미리 필요한 만큼 만들어둔다.
- 스레드는 스레드 풀에서 대기하며 쉰다.
- 작업 요청이 온다
- 스레드 풀에서 이미 만들어진 스레드를 하나 조회한다.
- 조회한 스레드1로 작업을 처리한다.
- 스레드1은 작업을 완료한다.
- 스레드는 작업을 완료한 후 종료하는 게 아니라, 다시 스레드 풀에 반납된다. 스레드1은 이후에 재사용될 수 있다.
- 스레드 풀이라는 개념을 사용하면 스레드를 재사용할 수 있어서, 재사용 시 스레드의 생성 시간을 절약할 수 있다.
- 스레드 풀이 스레드를 관리하기 때문에 필요한 만큼만 스레드를 만들 수 있고, 또 관리할 수 있다.
이러한 기능을 하는 스레드 풀을 만들려면, 스레드의 상태도 변경하는 로직을 작성하고, 최대 스레드 개수도 관리하고, 생산자-소비자 문제도 고려해야 한다. 직접 구현하기가 복잡하다. 이런 문제를 해결해주는 것이 바로 자바가 제공하는 Executor 프레임워크이다.
Executor 프레임워크
Executor 프레임워크는 스레드 풀, 스레드 관리, Runnable 의 문제점은 물론이고, 생산자-소비자 문제까지 한방에 해결해준다.
이 프레임워크는 작업 실행의 관리 및 스레드 풀 관리를 효율적으로 처리해서, 개발자가 직접 스레드를 생성하고 관리하는 복잡함을 줄여준다.
Executor 프레임워크의 주요 구성 요소
Executor 인터페이스
package java.util.concurrent;
public interface Executor {
void execute(Runnable command);
}
가장 단순한 작업 실행 인터페이스로, execute(Runnable command) 메서드 하나를 가지고 있다.
ExecutorService 인터페이스 - 주요 메서드
public interface ExecutorService extends Executor, AutoCloseable {
<T> Future<T> submit(Callable<T> task); // Callable: 반환값이 있는 runnable
@Override
default void close(){...}
...
}
Executor인터페이스를 확장해서 작업 제출과 제어 기능을 추가로 제공한다.- 주요 메서드로는
submit(),close()등이 있다. - 더 많은 기능이 있지만 나머지 기능들은 뒤에서 알아보자.
- Executor 프레임워크를 사용할 때는 대부분 이 인터페이스를 사용한다.
ExecutorService 인터페이스의 기본 구현체는 ThreadPoolExecutor 이다.
ThreadPoolExecutor 몇 가지 메서드
getPoolSize(): 스레드 풀에서 관리되는 스레드의 수를 반환getActiveCount(): 작업을 수행하는 스레드의 수를 반환getQueue().size(): 큐에 대기중인 작업의 수를 반환getCompletedCount(): 완료된 작업의 수
스레드풀의 상태를 출력해주는 유틸리티 메서드를 만들어서 사용할 수 있다.
ExecutorService 예제로 동작 살피기
작업 시간이 1초 걸리는 RunnableTask 클래스를 만들고, ThreadPoolExecutor를 이용하여 작업을 멀티스레드 환경에서 실행해보자.
public static void main(String[] args) throws InterruptedException {
ExecutorService es = new ThreadPoolExecutor(2, 2, 0,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
log("== 초기 상태 ==");
printState(es);
es.execute(new RunnableTask("taskA"));
es.execute(new RunnableTask("taskB"));
es.execute(new RunnableTask("taskC"));
es.execute(new RunnableTask("taskD"));
log("== 작업 수행 중 ==");
printState(es);
sleep(3000);
log("== 작업 수행 완료 ==");
printState(es);
es.close();
log("== shutdown 완료 ==");
printState(es);
}
ThreadPoolExecutor 는 크게 2가지 요소로 구성되어 있다.
- 스레드 풀: 스레드를 관리한다.
BlockingQueue: 작업을 보관한다. 생산자-소비자 문제를 해결하기 위해 단순한 큐가 아니라,BlockingQueue를 사용한다.
생산자가 es.execute(new RunnableTask("taskA")) 를 호출하면, RunnableTask("taskA") 인스턴스가 BlockingQueue 에 보관된다.
- 생산자:
es.execute(작업)를 호출하면 내부에서BlockingQueue에 작업을 보관한다. (main스레드가 생산자가 된다.) - 소비자: 스레드 풀에 있는 스레드가 소비자가 된다. 이후 소비자 중 하나가
BlockingQueue에 들어있는 작업을 꺼내서 처리한다.
스레드들이 작업을 꺼내려고 한다(소비자). 작업이 없으면 기다리다가 작업이 들어오면 깨어나서 실행한다. 생산자-소비자 구조이다.
ThreadPoolExecutor 생성자
new ThreadPoolExecutor(2, 2, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
ThreadPoolExecutor 의 생성자는 다음 속성을 사용한다.
corePoolSize: 스레드 풀에서 관리되는 기본 스레드의 수- 위 예제에서는 2개
maximumPoolSize: 스레드 풀에서 관리되는 최대 스레드 수- 위 예제에서는 2개
- 기본 스레드와 최대 스레드 수를 맞추었다. 따라서 풀에서 관리되는 스레드는 2개로 고정된다.
keepAliveTime,TimeUnit unit: 기본 스레드 수를 초과해서 만들어진 스레드가 생존할 수 있는 대기 시간이다.
이 시간 동안 처리할 작업이 없다면 초과 스레드는 제거된다.BlockingQueue workQueue: 작업을 보관할 블로킹 큐- 작업을 보관할 블로킹 큐의 구현체로
LinkedBlockingQueue를 사용했다. 참고로 이 블로킹 큐는 작업을 무한대로 저장할 수 있다.
- 작업을 보관할 블로킹 큐의 구현체로
실행 결과
12:10:54.451 [ main] == 초기 상태 ==
12:10:54.461 [ main] [pool=0, active=0, queuedTasks=0, completedTasks=0]
ThreadPoolExecutor를 생성한 시점에 스레드 풀에 스레드를 미리 만들어두지는 않는다.
main스레드가es.execute("taskA ~ taskD")를 호출한다.- 참고로 당연한 이야기지만
main스레드는 작업을 전달하고 기다리지 않는다. 전달한 작업은 다른 스레드가 실행할 것이다.main스레드는 작업을 큐에 보관까지만 하고 바로 다음 코드를 수행한다.
- 참고로 당연한 이야기지만
taskA~D요청이 블로킹 큐에 들어온다.- 최초의 작업이 들어오면 작업을 처리하기 위해 스레드가 생성된다.
- 참고로 스레드 풀에 스레드를 미리 만들어두지는 않는다.
- 작업이 들어올 때마다
corePoolSize까지 스레드를 만든다.- 예를 들어 최초 작업인
taskA가 들어오는 시점에 스레드1을 생성하고, 다음 작업인taskB가 들어오는 시점에 스레드2를 생성한다. - 이런 방식으로
corePoolSize에 지정한 수 만큼 스레드를 스레드 풀에 만든다. 여기서는 2를 설정했으므로 2개까지 만든다. corePoolSize까지 스레드가 생성되고 나면, 이후에는 스레드를 생성하지 않고 앞서 만든 스레드를 재사용한다.
- 예를 들어 최초 작업인
12:10:54.461 [ main] == 작업 수행 중 ==
12:10:54.461 [ main] [pool=2, active=2, queuedTasks=2, completedTasks=0]
- 스레드 풀에 관리되는 스레드가 2개이므로
pool=2 - 작업을 수행중인 스레드가 2개이므로
active=2 - 큐에 대기중인 작업이 2개이므로
queuedTasks=2 - 완료된 작업은 없으므로
completedTasks=0
스레드를 실제로 꺼내는 것은 아니고, 스레드의 상태가 변경된다고 이해하면 된다. 그래서 여전히 pool=2 로 유지된다.
- 작업이 완료되면 스레드 풀에 스레드를 반납한다. 스레드를 반납하면 스레드는 대기(
WAITING) 상태로 스레드 풀에 대기한다.- 참고로 실제 반납되는 게 아니라, 스레드의 상태가 변경된다고 이해하면 된다.
- 반납된 스레드는 재사용된다.
taskC,taskD의 작업을 처리하기 위해 스레드 풀에서 스레드를 꺼내 재사용한다.
- 작업이 완료되면 스레드는 다시 스레드 풀에서 대기한다.
12:10:57.465 [ main] == 작업 수행 완료 ==
12:10:57.466 [ main] [pool=2, active=0, queuedTasks=0, completedTasks=4]
12:10:57.468 [ main] == shutdown 완료 ==
12:10:57.469 [ main] [pool=0, active=0, queuedTasks=0, completedTasks=4]
close() 를 호출하면 ThreadPoolExecutor 가 종료된다. 이때 스레드 풀에 대기하는 스레드도 함께 제거된다.
참고로 close() 는 자바 19부터 지원되는 메서드이다. 만약 19 미만 버전을 사용한다면 shutdown() 을 호출하자. 둘의 차이는 뒤에서 설명한다.
위에서 봤던 1, 2번 문제점을 해결했다.
Runnable 불편함 해결
Runnable의 불편함(반환값X, 예외처리) 또한 해결해보자.
기존에는 작업의 결과를 반환받기 위해 Runnable을 구현한 객체의 필드에 결과를 저장해두고 해당 작업이 끝날 때 까지 요청 스레드는 join() 으로 대기하고, 작업이 끝나고 인스턴스 필드를 통해 값을 획득했다.
너무 복잡하다. 작업 스레드는 간단히 값을 return 을 통해 반환하고, 요청 스레드는 그 반환 값을 바로 받을 수 있다면 코드가 훨씬 더 간결해질 것이다.
이런 문제를 해결하기 위해 Executor 프레임워크는 Callable 과 Future 라는 인터페이스를 도입했다
Future
Runnable과 Callable 비교
Runnable은 다음과 같다.
package java.lang;
public interface Runnable {
void run();
}
Runnable의run()은 반환 타입이void이다. 따라서 값을 반환할 수 없다.- 예외가 선언되어 있지 않다. 따라서 해당 인터페이스를 구현하는 모든 메서드는 체크 예외를 던질 수 없다.
Callable은 다음과 같다.
package java.util.concurrent;
public interface Callable<V> {
V call() throws Exception;
}
java.util.concurrent에서 제공되는 기능이다.Callable의call()은 반환 타입이 제네릭V이다. 따라서 값을 반환할 수 있다.throws Exception예외가 선언되어 있다. 따라서 해당 인터페이스를 구현하는 모든 메서드는 체크 예외인Exception과 그 하위 예외를 모두 던질 수 있다.
Callable 을 사용해서 Runnable의 문제점을 해결해보자.
Callable과 Future 사용
public class CallableMainV1 {
public static void main(String[] args) throws ExecutionException,
InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(1);
Future<Integer> future = es.submit(new MyCallable());
Integer result = future.get();
log("result value = " + result);
es.close();
}
static class MyCallable implements Callable<Integer> {
@Override
public Integer call() {
log("Callable 시작");
sleep(2000);
int value = new Random().nextInt(10);
log("create value = " + value);
log("Callable 완료");
return value;
}
}
}
먼저 MyCallable 을 구현하는 부분을 보자.
- 숫자를 반환하므로 반환할 제네릭 타입을
<Integer>로 선언했다. - 구현은
Runnable코드와 비슷한데, 유일한 차이는 결과를 필드에 담아두는 것이 아니라, 결과를 반환한다는 점이다. 따라서 결과를 보관할 별도의 필드를 만들지 않아도 된다.
스레드풀 생성 - 기존 코드
ExecutorService es = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
스레드풀 생성 - 편의 코드
ExecutorService es = Executors.newFixedThreadPool(1);
java.util.concurrent.Executors 가 제공하는 newFixedThreadPool(size) 을 사용하면 편리하게 ExecutorService 를 생성할 수 있다.
submit()
<T> Future<T> submit(Callable<T> task); // 인터페이스 정의
ExecutorService 가 제공하는 submit() 을 통해 Callable 작업을 전달할 수 있다.
Future<Integer> future = es.submit(new MyCallable());
MyCallable 인스턴스가 블로킹 큐에 전달되고, 스레드 풀의 스레드 중 하나가 이 작업을 실행할 것이다.
이때 작업의 처리 결과는 직접 반환되는 것이 아니라 Future 라는 특별한 인터페이스를 통해 반환된다.
Integer result = future.get();
future.get() 을 호출하면 MyCallable 의 call() 이 반환한 결과를 받을 수 있다.
참고로 Future.get() 은 InterruptedException, ExecutionException 체크 예외를 던진다. 이 예제에서는 일단 밖으로 던졌다. 나중에 다시 다룬다.
Executor 프레임워크의 강점
요청 스레드가 결과를 받아야 하는 상황이라면, Callable 을 사용한 방식은 Runnable 을 사용하는 방식보다 훨씬 편리하다. 코드만 보면 복잡한 멀티스레드를 사용한다는 느낌보다는, 단순한 싱글 스레드 방식으로 개발한다는 느낌이 들 것이다.
이 과정에서 내가 스레드를 생성하거나, join() 으로 스레드를 제어하거나 한 코드는 전혀 없다. 심지어 Thread 라는 코드도 없다. 단순하게 ExecutorService 에 필요한 작업을 요청하고 결과를 받아서 쓰면 된다!
하지만 편리한 것은 편리한 것이고, 기반 원리를 제대로 이해해야 문제없이 사용할 수 있다.
여기서 잘 생각해보면 한 가지 애매한 상황이 있다.
Future<Integer> future = es.submit(new MyCallable());
// 메인 메서드는 es.submit()이 완료될 때 까지 기다리지 않고 바로 다음 라인을 실행한다.
Integer result = future.get(); // 과연 작업을 끝내고 값을 반환했을까?
future.get() 을 호출하는 요청 스레드( main )는 future.get() 을 호출 했을 때 2가지 상황으로 나뉘게 된다.
MyCallable작업을 처리하는 스레드 풀의 스레드가 작업을 완료했다.MyCallable작업을 처리하는 스레드 풀의 스레드가 아직 작업을 완료하지 못했다.
future.get() 을 호출했을 때 스레드 풀의 스레드가 작업을 완료했다면 반환 받을 결과가 있을 것이다. 그런데 아직 작업을 처리중이라면 어떻게 될까?
이런 의문도 들 것이다. 왜 결과를 바로 반환하지 않고, 불편하게 Future 라는 객체를 대신 반환할까? 이 부분을 제대로 이해해야 한다.
Future2 - 분석
Future 는 번역하면 미래라는 뜻이고, 여기서는 미래의 결과를 받을 수 있는 객체라는 뜻이다.
그렇다면 누구의 미래의 결과를 말하는 것일까?
Future<Integer> future = es.submit(new MyCallable());
submit()은MyCallable.call()이 반환하는 무작위 숫자 대신에Future를 반환한다.- 생각해보면
MyCallable이 즉시 실행되어서 즉시 결과를 반환하는 것은 불가능하다. 왜냐하면MyCallable은 즉시 실행되는 것이 아니라, 스레드 풀의 다른 스레드가 미래의 어떤 시점에 이 코드를 대신 실행해야 하기 때문이다. - 따라서 결과를 즉시 받는 것은 불가능하다. 이런 이유로
es.submit()은MyCallable의 결과를 반환하는 대신에MyCallable의 결과를 나중에 받을 수 있는Future라는 객체를 대신 제공한다. - 정리하면
Future는 전달한 작업의 미래이다. 이 객체를 통해 전달한 작업의 미래 결과를 받을 수 있다.
단순하게 정리하면, Future 는 전달한 작업의 미래 결과를 담고 있다고 생각하면 된다.
그러면 이 Future 객체는 어떻게 하길래 스레드 실행 결과를 문제 없이 반환할 수 있는 것일까?
아래 코드를 봐보자.
ExecutorService es = Executors.newFixedThreadPool(1);
log("submit() 호출");
Future<Integer> future = es.submit(new MyCallable());
log("future 즉시 반환, future = " + future);
log("future.get() [블로킹] 메서드 호출 시작 -> main 스레드 WAITING");
Integer result = future.get();
log("future.get() [블로킹] 메서드 호출 완료 -> , main 스레드 RUNNABLE");
log("result value = " + result);
log("future 완료, future = " + future);
es.close();
09:24:42.689 [ main] submit() 호출
09:24:42.691 [pool-1-thread-1] Callable 시작
09:24:42.691 [ main] future 즉시 반환, future = FutureTask@46d56d67[Not completed, task = thread.executor.future.CallableMainV2$MyCallable@14acaea5]
09:24:42.691 [ main] future.get() [블로킹] 메서드 호출 시작 -> main 스레드 WAITING
09:24:44.703 [pool-1-thread-1] create value = 4
09:24:44.703 [pool-1-thread-1] Callable 완료
09:24:44.703 [ main] future.get() [블로킹] 메서드 호출 완료 -> , main 스레드 RUNNABLE
09:24:44.704 [ main] result value = 4
09:24:44.704 [ main] future 완료, future = FutureTask@46d56d67[Completed normally]
MyCallable인스턴스를 편의상taskA라고 하겠다.- 편의상 스레드 풀에 스레드가 1개 있다고 가정하겠다.
es.submit(new MyCallable())
09:24:42.689 [ main] submit() 호출, [논블로킹] 메서드
submit()을 호출해서ExecutorService에taskA를 전달한다.
Future의 생성
- 요청 스레드는
es.submit(taskA)를 호출하는 중이다. ExecutorService는 전달한taskA의 미래 결과를 알 수 있는Future객체를 생성한다.Future는 인터페이스이다. 이때 생성되는 실제 구현체는FutureTask이다.
- 그리고 생성한
Future객체 안에taskA의 인스턴스를 보관한다. Future는 내부에taskA작업의 완료 여부와, 작업의 결과 값을 가진다.
submit() 을 호출한 경우 Future 가 만들어지고, 전달한 작업인 taskA 가 바로 블로킹 큐에 담기는 것이 아니라, 그림처럼taskA 를 감싸고 있는 Future 가 대신 블로킹 큐에 담긴다.
Future<Integer> future = es.submit(new MyCallable());
09:24:42.691 [ main] future 즉시 반환, future = FutureTask@46d56d67[Not
completed, task = thread.executor.future.CallableMainV2$MyCallable@14acaea5]
Future는 내부에 작업의 완료 여부와, 작업의 결과 값을 가진다. 작업이 완료되지 않았기 때문에 아직은 결과 값이 없다.- 로그를 보면
Future의 구현체는FutureTask이다. Future의 상태는 "Not completed"(미 완료)이고, 연관된 작업은 전달한taskA(MyCallable 인스턴스)이다.
- 로그를 보면
- 여기서 중요한 핵심이 있는데, 작업을 전달할 때 생성된
Future는 즉시 반환된다는 점이다.
다음 로그를 보자.
09:24:42.691 [ main] future 즉시 반환, future = FutureTask@46d56d67[Not completed, task = thread.executor.future.CallableMainV2$MyCallable@14acaea5]
09:24:42.691 [ main] future.get() [블로킹] 메서드 호출 시작 -> main 스레드 WAITING
- 생성한
Future를 즉시 반환하기 때문에 요청 스레드는 대기하지 않고, 자유롭게 본인의 다음 코드를 호출할 수 있다.- 이것은 마치
Thread.start()를 호출한 것과 비슷하다.Thread.start()를 호출하면 스레드의 작업 코드가 별도의 스레드에서 실행된다. 요청 스레드는 대기하지 않고, 즉시 다음 코드를 호출할 수 있다.
- 이것은 마치
09:24:42.691 [pool-1-thread-1] Callable 시작
- 큐에 들어있는
Future[taskA]를 꺼내서 스레드 풀의 스레드1이 작업을 시작한다. - 참고로
Future의 구현체인FutureTask는Runnable인터페이스도 함께 구현하고 있다. - 스레드1은
FutureTask의run()메서드를 수행한다. - 그리고
run()메서드가taskA의call()메서드를 호출하고 그 결과를 받아서 처리한다.FutureTask.run()MyCallable.call()
09:24:42.691 [ main] future.get() [블로킹] 메서드 호출 시작 -> main 스레드 WAITING
스레드1
- 스레드1은
taskA의 작업을 아직 처리중이다. 아직 완료하지는 않았다.
요청 스레드
- 요청 스레드는
Future인스턴스의 참조를 가지고 있다. - 그리고 언제든지 본인이 필요할 때
Future.get()을 호출해서taskA작업의 미래 결과를 받을 수 있다. - 요청 스레드는 작업의 결과가 필요해서
future.get()을 호출한다.Future에는 완료 상태가 있다.taskA의 작업이 완료되면Future의 상태도 완료로 변경된다.- 그런데 여기서
taskA의 작업이 아직 완료되지 않았다. 따라서Future도 완료 상태가 아니다.
- 요청 스레드가
future.get()을 호출하면Future가 완료 상태가 될 때 까지 대기한다. 이때 요청 스레드의 상태는RUNNABLEWAITING이 된다.
future.get() 을 호출했을 때
- Future가 완료 상태:
Future가 완료 상태면Future에 결과도 포함되어 있다. 이 경우 요청 스레드는 대기하지 않고, 값을 즉시 반환받을 수 있다. - Future가 완료 상태가 아님:
taskA가 아직 수행되지 않았거나 또는 수행 중이라는 뜻이다. 이때는 어쩔 수 없이 요청 스레드가 결과를 받기 위해 대기해야 한다. 요청 스레드가 마치 락을 얻을 때처럼, 결과를 얻기 위해 대기한다. 이처럼 스레드가 어떤 결과를 얻기 위해 대기하는 것을 블로킹(Blocking)이라 한다.
참고: 블로킹 메서드
Thread.join() , Future.get() 과 같은 메서드는 스레드가 작업을 바로 수행하지 않고, 다른 작업이 완료될 때 까지 기다리게 하는 메서드이다. 이러한 메서드를 호출하면 호출한 스레드는 지정된 작업이 완료될 때까지 블록(대기)되어 다른 작업을 수행할 수 없다.
09:24:44.703 [pool-1-thread-1] create value = 4
09:24:44.703 [pool-1-thread-1] Callable 완료
09:24:44.703 [ main] future.get() [블로킹] 메서드 호출 완료 -> , main 스레드
RUNNABLE
요청 스레드
- 대기(
WAITING) 상태로future.get()을 호출하고 대기중이다
스레드1
taskA작업을 완료한다.Future에taskA의 반환 결과를 담는다.Future의 상태를 완료로 변경한다.- 요청 스레드를 깨운다. 요청 스레드는
WAITINGRUNNABLE상태로 변한다.- Future(스레드1)은 작업 완료를 기다리는 스레드를 담아두기 때문에 깨울 수 있다.
09:24:44.703 [ main] future.get() [블로킹] 메서드 호출 완료 -> , main 스레드 RUNNABLE
09:24:44.704 [ main] result value = 4
요청 스레드
- 요청 스레드는
RUNNABLE상태가 되었다. 그리고 완료 상태의Future에서 결과를 반환 받는다. 참고로taskA의 결과가Future에 담겨있다.
스레드1
- 작업을 마친 스레드1은 스레드 풀로 반환된다.
RUNNABLEWAITING
09:24:44.704 [ main] future 완료, future =
java.util.concurrent.FutureTask@46d56d67[Completed normally]
Future 의 인스턴스인 FutureTask 를 보면 "Completed normally"로 정상 완료된 것을 확인할 수 있다.
Future가 필요한 이유?
Future를 반환 하는 코드
Future<Integer> future = es.submit(new MyCallable()); // 여기는 블로킹 아님
future.get(); // 여기서 블로킹
ExecutorService 를 설계할 때 지금처럼 복잡하게 Future 를 반환하는게 아니라 다음과 같이 결과를 직접 받도록
설계하는게 더 단순하고 좋지 않았을까?
결과를 직접 반환 하는 코드(가정)
Integer result = es.submit(new MyCallable()); // 여기서 블로킹
물론 이렇게 설계하면 submit() 을 호출할 때, 작업의 결과가 언제 나올지 알 수 없다. 따라서 작업의 결과를 받을 때 까지 요청 스레드는 대기해야 한다. 그런데 이것은 Future 를 사용할 때도 마찬가지다. Future 만 즉시 반환 받을 뿐이지, 작업의 결과를 얻으려면 결국 future.get() 을 호출해야 한다. 그리고 이 시점에는 작업의 결과를 받을 때 까지 대기해야 한다.
(= 논블록으로 future 받고 future.get()으로 블로킹되거나, 그냥 바로 리턴값을 받기위해 블로킹되는거나 똑같은 거 아니냐? 왜 이렇게 future라는 걸 만들어서 분리해서 쓰냐?)
다음 예제 코드를 보면 그 이유를 바로 확인할 수 있다.
1~100 까지 더하는 경우를 스레드를 사용해서 1~50 , 51~100 으로 나누어 처리해보자.
SumTask task1 = new SumTask(1, 50);
SumTask task2 = new SumTask(51, 100);
ExecutorService es = Executors.newFixedThreadPool(2);
Future<Integer> future1 = es.submit(task1);
Future<Integer> future2 = es.submit(task2);
Integer sum1 = future1.get();
Integer sum2 = future2.get();
log("task1.result=" + sum1);
log("task2.result=" + sum2);
int sumAll = sum1 + sum2;
log("task1 + task2 = " + sumAll);
log("End");
es.close();
만약에 Future 없이 결과를 직접 반환하는 코드였다면 (가정, 이런 코드는 실제로는 없음)
Integer sum1 = es.submit(task1); // 여기서 블로킹
Integer sum2 = es.submit(task2); // 여기서 블로킹
마치 단일 스레드가 작업을 하는 것처럼 동작하게 된다.
Future를 반환한다면
요청 스레드는 task1 , task2 를 동시에 요청할 수 있다. 따라서 두 작업은 동시에 수행되고, task1의 결과를 받기 위해 2초 대기하고, 그 다음 task2의 작업 결과를 받으려고 하는 시점에 작업 스레드2는 이미 2초간 작업을 완료했으므로 future2.get() 은 거의 즉시 결과를 반환한다.
정리하자면 요청과 작업 결과 획득을 분리해서 동시에 작업을 실행할 수 있도록 한다고 보면 된다.
또한, 아래와 같이 Future를 활용해서는 안 된다.
Future를 잘못 활용한 예1
Future<Integer> future1 = es.submit(task1); // non-blocking
Integer sum1 = future1.get(); // blocking, 2초 대기
Future<Integer> future2 = es.submit(task2); // non-blocking
Integer sum2 = future2.get(); // blocking, 2초 대기
Future를 잘못 활용한 예2
Integer sum1 = es.submit(task1).get(); // get()에서 블로킹
Integer sum2 = es.submit(task2).get(); // get()에서 블로킹
위 두 케이스 모두 2초 대기를 2번 하기 때문에 총 4초의 시간이 걸린다.
정리
Future라는 개념이 없다면 결과를 받을 때까지 요청 스레드는 아무 일도 못하고 대기해야 한다. 따라서 다른 작업을 동시에 수행할 수도 없다.Future덕분에 요청 스레드는 대기하지 않고 다른 작업(예: 다른 요청)을 수행할 수 있다. 예를 들어서 다른 작업을 더 요청할 수 있다. 그리고 모든 작업 요청이 끝난 다음에 필요할 때Future.get()을 호출하여 최종 결과를 받을 수 있다.Future를 사용하는 경우 결과적으로task1,task2를 동시에 요청할 수 있다. 두 작업을 바로 요청했기 때문에 작업을 동시에 제대로 수행할 수 있다.
Future가 제공하는 기능
Future 는 작업의 미래 계산 결과를 나타내며, 계산이 완료되었는지 확인하고, 완료될 때까지 기다릴 수 있는 기능을 제공한다.
Future 인터페이스
package java.util.concurrent;
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
enum State {
RUNNING,
SUCCESS,
FAILED,
CANCELLED
}
default State state() {}
}
주요 메서드
boolean cancel(boolean mayInterruptIfRunning)
- 기능: 아직 완료되지 않은 작업을 취소한다.
- 매개변수:
mayInterruptIfRunningcancel(true):Future를 취소 상태로 변경하며, 작업이 실행 중이라면Thread.interrupt()를 호출해서 작업을 중단한다.cancel(false):Future를 취소 상태로 변경하되, 실행 중인 작업은 중단하지 않는다.
- 반환값: 작업이 성공적으로 취소된 경우
true, 이미 완료되었거나 취소할 수 없는 경우false - 설명: 작업이 실행 중이 아니거나 아직 시작되지 않았으면 취소하고, 실행 중인 작업의 경우
mayInterruptIfRunning이true이면 중단을 시도한다. - 참고: 취소 상태의
Future에Future.get()을 호출하면CancellationException런타임 예외가 발생한다.
cancel(true)를 사용하면 실행중인 작업에 인터럽트가 발생해서 실행중인 작업을 중지 시도한다.
cancel(false)를 사용하면 Future의 상태만 CANCEL로 변경하고, 실행중인 작업은 그냥 둔다.
두 케이스 모두 future.get()을 사용하면 CancellationException 런타임 예외가 발생한다.
boolean isCancelled()
- 기능: 작업이 취소되었는지 여부를 확인한다.
- 반환값: 작업이 취소된 경우
true, 그렇지 않으면false - 설명: 이 메서드는 작업이
cancel()메서드에 의해 취소된 경우에true를 반환한다.
boolean isDone()
- 기능: 작업이 완료되었는지 여부를 확인한다.
- 반환값: 작업이 완료된 경우
true, 그렇지 않은 경우false - 설명: 작업이 정상적으로 완료되었거나, 취소되었거나, 예외가 발생하여 종료된 경우에
true를 반환한다.
State state()
- 기능:
Future의 상태를 반환한다. 자바 19부터 지원한다.RUNNING: 작업 실행 중SUCCESS: 성공 완료FAILED: 실패 완료CANCELLED: 취소 완료
V get()
- 기능: 작업이 완료될 때까지 대기하고, 완료되면 결과를 반환한다.
- 반환값: 작업의 결과
- 예외:
InterruptedException: 대기 중 현재 스레드가 인터럽트된 경우 발생ExecutionException: 작업 계산 중 예외가 발생한 경우 발생
- 설명: 작업이 완료될 때까지
get()을 호출한 현재 스레드를 대기(블록킹)한다. 작업이 완료되면 결과를 반환한다.
V get(long timeout, TimeUnit unit)
- 기능:
get()과 같으나, 시간 초과 시 예외를 발생시킨다. - 매개변수:
timeout: 대기할 최대 시간unit: timeout매개변수의 시간 단위 지정
- 반환값: 작업의 결과
- 예외:
InterruptedException: 대기 중에 현재 스레드가 인터럽트된 경우 발생ExecutionException: 계산 중에 예외가 발생한 경우 발생TimeoutException: 주어진 시간 내에 작업이 완료되지 않은 경우 발생
- 설명: 지정된 시간 동안 결과를 기다린다. 시간이 초과되면
TimeoutException을 발생시킨다.
Future - 예외
Future.get() 을 호출하면 작업의 결과뿐만 아니라, 작업 중에 발생한 예외도 받을 수 있다. (기존에 학습했던 Runnable은 예외를 던질 수 없어서 요런 게 불가능했다)
public class FutureExceptionMain {
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(1);
log("작업 전달");
Future<Integer> future = es.submit(new ExCallable());
sleep(1000); // 잠시 대기
try {
log("future.get() 호출 시도, future.state(): " + future.state());
Integer result = future.get();
log("result value = " + result);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
log("e = " + e);
Throwable cause = e.getCause(); // 원본 예외
log("cause = " + cause);
}
es.close();
}
static class ExCallable implements Callable<Integer> {
@Override
public Integer call() {
log("Callable 실행, 예외 발생");
throw new IllegalStateException("ex!");
}
}
}
실행 결과
15:05:04.460 [ main] 작업 전달
15:05:04.463 [pool-1-thread-1] Callable 실행, 예외 발생
15:05:05.471 [ main] future.get() 호출 시도, future.state(): FAILED
15:05:05.472 [ main] e = java.util.concurrent.ExecutionException: java.lang.IllegalStateException: ex!
15:05:05.473 [ main] cause = java.lang.IllegalStateException: ex!
- 요청 스레드:
es.submit(new ExCallable())을 호출해서 작업을 전달한다. - 작업 스레드:
ExCallable을 실행하는데,IllegalStateException예외가 발생한다.- 작업 스레드는
Future에 발생한 예외를 담아둔다. 참고로 예외도 객체이다. 잡아서 필드에 보관할 수 있다. - 예외가 발생했으므로
Future의 상태는FAILED가 된다.
- 작업 스레드는
- 요청 스레드: 결과를 얻기 위해
future.get()을 호출한다.Future의 상태가FAILED면ExecutionException예외를 던진다.- 이 예외는 내부에 앞서
Future에 저장해둔IllegalStateException을 포함하고 있다. e.getCause()을 호출하면 작업에서 발생한 원본 예외를 받을 수 있다.
Future.get() 은 작업의 결과 값을 받을 수도 있고, 예외를 받을 수도 있다. 마치 싱글 스레드 상황에서 일반적인 메서드를 호출하는 것 같다. Executor 프레임워크가 얼마나 잘 설계되어 있는지 알 수 있는 부분이다.
ExecutorService - 작업 컬렉션 처리
ExecutorService 는 여러 작업을 한 번에 편리하게 처리하는 invokeAll(), invokeAny() 기능을 제공한다.
invokeAll()
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException- 모든
Callable작업을 제출하고, 모든 작업이 완료될 때까지 기다린다.
- 모든
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException- 지정된 시간 내에 모든
Callable작업을 제출하고 완료될 때까지 기다린다.
- 지정된 시간 내에 모든
invokeAny()
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException- 하나의
Callable작업이 완료될 때까지 기다리고, 가장 먼저 완료된 작업의 결과를 반환한다. - 완료되지 않은 나머지 작업은 취소한다.
- 하나의
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException- 지정된 시간 내에 하나의
Callable작업이 완료될 때까지 기다리고, 가장 먼저 완료된 작업의 결과를 반환한다. - 완료되지 않은 나머지 작업은 취소한다.
- 지정된 시간 내에 하나의
invokeAll(), invokeAny() 를 사용하면 한꺼번에 여러 작업을 요청할 수 있다. 둘의 차이를 코드로 알아보자
invokeAll()
ExecutorService es = Executors.newFixedThreadPool(10);
CallableTask task1 = new CallableTask("task1", 1000); // 1초 소요
CallableTask task2 = new CallableTask("task2", 2000); // 2초 소요
CallableTask task3 = new CallableTask("task3", 3000); // 3초 소요
List<CallableTask> tasks = List.of(task1, task2, task3);
List<Future<Integer>> futures = es.invokeAll(tasks);
for (Future<Integer> future : futures) {
Integer value = future.get();
log("value = " + value);
}
es.close();
task1, task2, task3이 모두 완료된다. 총 소요시간은 3초이다.
ex) 컴퓨터 자원이 남아돌아서 여러군데 호출했는데, 결과가 전부 필요하면 요걸 쓴다.
invokeAny()
ExecutorService es = Executors.newFixedThreadPool(10);
CallableTask task1 = new CallableTask("task1", 1000);
CallableTask task2 = new CallableTask("task2", 2000);
CallableTask task3 = new CallableTask("task3", 3000);
List<CallableTask> tasks = List.of(task1, task2, task3);
Integer value = es.invokeAny(tasks);
log("value = " + value);
es.close();
가장 빨리 작업이 완료된 task1의 결과만 저장되고, 완료되지 않은 나머지 작업에서는 인터럽트가 발생하고 작업이 취소된다.
ex) 컴퓨터 자원이 남아돌아서 여러군데 호출했는데, 결과가 빨리 온 것을 고객에게 전달해야하는 경우 요걸 쓰면 된다.
'Java > 멀티스레드&동시성' 카테고리의 다른 글
| [Java] 45. 스레드 풀과 Executor 프레임워크(2) (0) | 2025.07.01 |
|---|---|
| [Java] 43. 동시성 컬렉션 (0) | 2025.07.01 |
| [Java] 42. CAS - 동기화와 원자적 연산 (0) | 2025.07.01 |
| [Java] 41. 생산자 소비자 문제(2) (0) | 2025.07.01 |
| [Java] 40. 생산자 소비자 문제(1) (0) | 2025.07.01 |