스레드 풀의 필요성
스레드 풀을 사용하면 스레드를 재사용할 수 있어서, 스레드 생성 시간을 절약할 수 있다. 또한 생성되는 최대 스레드 수가 제한되어 무분별하게 스레드가 생성되어 자원을 낭비하는 것을 막을 수 있다.
스레드 풀을 직접 구현하려면 다음과 같은 것들을 고려해야 하기 때문에 매우 까다롭다.
- 스레드가 처리할 작업이 없으면 스레드의 상태를 WAITING 상태로 관리하고, 작업 요청이 오면 RUNNABLE 상태로 변경해야 한다.
- 어떤 생산자가 작업을 만들고, 스레드 풀에 있는 스레드가 소비자가 된다. 따라서 생산자-소비자 문제를 고려해야 한다.
자바가 제공하는 Executor 프레임워크는 스레드 풀과 스레드를 관리해주며, Runnable의 문제점과 생산자-소비자 문제까지 해결해준다. 실무에서는 스레드를 직업 하나하나 생성하여 사용하기보다 Executor 프레임워크를 주로 사용한다.
Executor, ExecutorService 인터페이스
Executor와 ExecutorService는 스레드 풀 구현을 위한 인터페이스이다.
- Executor 인터페이스는 가장 단순한 작업 실행 인터페이스로, execute(Runnable command) 메서드 하나를 가지고 있다.
- ExecutorService 인터페이스는 Executor 인터페이스를 확장해서 작업 제출과 제어 기능을 추가로 제공한다. 주요 메서드로는 submit(), close()가 있다. Executor 프레임워크를 사용할 때는 대부분 이 인터페이스를 사용한다.
public interface Executor {
void execute(Runnable command);
}
public interface ExecutorService extends Executor, AutoCloseable {
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
@Override
default void close() {...}
...
}
ThreadPoolExecutor
ExecutorService의 대표적인 구현체로 ThreadPoolExecutor가 있다. ThreadPoolExecutor는 크게 2가지 요소로 구성되어 있다.
- 스레드 풀
- BlockingQueue: 작업을 보관하는 곳이다. 생산자-소비자 문제를 해결하기 위해 단순 큐가 아닌 BlockingQueue가 사용된다.
ThreadPoolExecutor에서의 생산자-소비자
다음 예제를 보자. ThreadPoolExecutor를 만들고, executorService.execute() 메서드를 호출했다. 즉, executorService.execute(작업)를 호출하면 작업이 BlockingQueue에 보관되고, 스레드 풀의 스레드가 BlockingQueue의 작업 중 하나를 받아서 처리한다.
public class Example {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
executorService.execute(new RunnableTask("taskA"));
executorService.execute(new RunnableTask("taskB"));
executorService.close();
}
static class RunnableTask implements Runnable {
String taskName;
public RunnableTask(String taskName) {
this.taskName = taskName;
}
@Override
public void run() {
try {
Thread.sleep(1000); // 작업 시간 1초
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
- 생산자: executorService.execute(작업)를 호출하면, BlockingQueue에 작업이 보관된다. 즉, main 스레드가 BlockingQueue의 생산자가 된다.
- 소비자: 스레드 풀에 있는 스레가 BlockingQueue의 소비자가 된다. 여러 작업 중 하나를 받아 처리한다.
ThreadPoolExecutor 생성자
ThreadPoolExecutor 생성자는 다음 속성을 사용한다.
- corePoolSize: 스레드 풀에서 관리되는 기본 스레드 수
- maximumPoolSize: 스레드 풀에서 관리되는 최대 스레드 수
- keepAliveTime: 기본 스레드 수를 초과해서 만들어진 스레드가 생존할 수 있는 대기 시간, 이 시간 동안 처리할 작업이 없으면 초과 스레드는 제거됨
- workQueue: 작업을 보관할 블로킹 큐
그리고 executorService.close() 메서드를 호출하면 ThreadPoolExecutor가 종료되고 스레드 풀에 대기하는 스레드도 함께 제거된다.
Callable과 Future 인터페이스
Runnable의 불편함
자바에서 스레드를 생성하려면 Thread 클래스를 상속받거나 Runnable 인터페이스를 구현해야 한다. 그리고 두 방법 중 Runnable 인터페이스를 구현하는 방법이 더 권장된다.
@FunctionalInterface
public interface Runnable {
void run();
}
그런데 Runnable 인터페이스는 다음과 같은 불편함이 있다.
- 반환 값이 없다: run() 메서드는 반환 값을 가지지 않는다. 따라서 실행 결과를 얻기 위해 별도의 메커니즘을 사용해야 한다.
- 체크 예외를 던질 수 없다: run() 메서드는 예외를 던지지 않는다. 따라서 run() 메서드를 재정의할 때, 체크 예외를 던질 수 없으며 메서드 내부에서 처리해야 한다.
예를 들어, 다음 방식은 스레드 실행 결과를 별도의 멤버 변수에 넣어두고, join()을 통해 스레드가 종료되길 기다린 후 멤버 변수 값을 가져와 사용하는 방식이다.
public class Example {
public static void main(String[] args) throws InterruptedException {
SumTask task1 = new SumTask(1, 50);
SumTask task2 = new SumTask(51, 100);
Thread thread1 = new Thread(task1, "thread-1");
Thread thread2 = new Thread(task2, "thread-2");
thread1.start();
thread2.start();
thread1.join(); // thread1이 TERMINATED 상태가 될 때까지 대기
thread2.join(); // thread2가 TERMINATED 상태가 될 때까지 대기
System.out.println("sum result = " + (task1.result + task2.result));
}
static class SumTask implements Runnable {
int startValue;
int endValue;
int result;
public SumTask(int startValue, int endValue) {
this.startValue = startValue;
this.endValue = endValue;
}
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
int sum = 0;
for (int i = startValue; i <= endValue; i++) {
sum += i;
}
result = sum;
}
}
}
위 예제에서 작업 스레드가 return으로 작업 결과를 반환하고, 요청 스레드가 그 반환 값을 바로 받을 수 있다면 훨씬 간편할 것이다. 이런 문제를 해결하기 위해 Executor 프레임워크는 Callable과 Future 인터페이스를 제공한다.
Callable, Future
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
Callable의 call() 메서드의 반환 타입은 제네릭이다. 따라서 값을 반환할 수 있다. 그리고 throws Exception이 선언되어 있으므로, 해당 인터페이스를 구현하는 모든 메서드는 체크 예외인 Exception과 그 하위 예외를 던질 수 있다.
Runnable이 아닌 Callable을 사용하려면 다음과 같이 사용하면 된다.
public class Example {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(1);
Future<Integer> future = executorService.submit(new MyCallable());
Integer result = future.get();
System.out.println("result = " + result);
executorService.close();
}
static class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
Thread.sleep(1000);
int value = new Random().nextInt(10);
return value;
}
}
}
- ExecutorService의 submit() 메서드를 통해 Callcable을 작업으로 전달할 수 있다. 그러면 MyCallable 인스턴스가 블로킹 큐에 전달되고, 스레드 풀의 스레드 중 하나가 이 작업을 실행한다.
- submit() 메서드의 반환 값은 Future이다. 즉, 작업 처리 결과는 직접 반환되는 것이 아니라 Future라는 인터페이스를 통해 반환된다.
- Future의 get() 메서드를 통해 작업 처리 결과를 받을 수 있다.
ThreadPoolExeutor를 통해 스레드 풀의 스레드를 사용했고, Runnable이 아닌 Callable을 사용했다. 이를 통해 개발자가 직접 스레드를 생성하거나 join()으로 스레드를 제어할 필요가 없게 되었다. 단순하게 ExecutorService에게 필요한 작업 처리를 요청하고, 결과를 Future 형태로 받아서 사용하면 된다. 즉, 복잡한 멀티 스레드를 마치 싱글 스레드처럼 편리하게 사용할 수 있게 해주는 것이 Executor 프레임워크이다.
ExecutorService의 submit()의 반환 값인 Future
MyCallable.call() 메서드는 호출 스레드(main 스레드)가 직접 실행하는 것이 아니라, 스레드 풀의 스레드가 실행한다. 따라서 main 스레드는 언제 실행이 완료되는지 알 수 없다. 따라서 결과를 즉시 받는 것이 불가능하다. 이런 이유로 executorService.submit()의 결과로 MyCallable의 결과를 직접 반환하지 않고 MyCallable 결과를 나중에 받을 수 있는 Future라는 객체를 반환한다. 즉, Future는 전달한 작업의 미래라고 할 수 있다. 이 객체를 통해 전달한 작업의 미래 결과를 받을 수 있다.
즉, submit() 메서드의 반환 값이 Future이기 때문에 요청 스레드는 작업 처리 결과를 반환받을 때까지 블로킹되지 않고, 그 다음 작업을 이어서 할 수 있다. 그리고 요청 스레드가 작업 처리 결과가 필요할 때 future.get()을 통해 결과를 받으면 된다.
Future의 get() 메서드
요청 스레드(main 스레드)가 future.get()을 호출했을 때 MyCallable 작업을 처리하는 스레드 풀의 스레드가 요청을 완료했을 수도 있고, 아직 완료하지 못했을 수도 있다. 만약 아직 요청을 완료하지 않았을 때 get() 메서드를 호출하면 어떻게 될까? 불편하게 요청 처리 결과를 바로 반환하지 않고 Future라는 객체를 대신 반환할까?
요청 스레드가 executorService.submit()을 호출하면, ExecutorService는 전달한 작업(Callable)의 미래 결과를 알 수 있는 Future 객체를 생성한다.
- 정확히는, Future는 인터페이스이고 구현체인 FutureTask를 생성한다.
- 생성한 Future 객체 안에 Callable 인스턴스를 보관하고, Callable 객체의 작업 완료 여부와 작업의 결과 값을 갖는다.
그리고 생성한 Future 객체가 블로킹 큐에 보관된다.
그리고 future.get() 메서드를 호출했을 때
- Future가 완료 상태이면, 요청 스레드는 대기하지 않고 값을 즉시 반환받을 수 있다.
- Future가 완료 상태가 아니면, 즉 Callable 작업이 아직 수행되지 않았거나 수행 중이면, 요청 스레드는 결과 값을 반환받을 때까지 대기한다. 즉, 블로킹된다. (이때 요청 스레드의 상태는 RUNNABLE에서 WAITING이 된다.)
참고: 블로킹 메서드
Thread.join(), Future.get()과 같은 메서드는 스레드가 작업을 바로 수행하지 않고, 다른 작업이 완료될 때까지 대기한다. 이러한 메서드를 블로킹 메서드라고 한다.
submit() 메서드와 get() 메서드가 각각 존재하는 이유
다음과 같이 submit() 메서드를 통해 ExecutorService에게 작업을 요청한다. 그리고 submit() 메서드의 반환 값은 Future이며, 이 Future는 submit() 메서드 호출 즉시 반환된다. 그리고 작업 처리 결과를 받고 싶으면 get() 메서드를 호출하면 된다.
Future<Integer> future1 = executorService.submit(task1); // 여기는 블로킹 아님
Future<Integer> future2 = executorService.submit(task2); // 여기는 블로킹 아님
Integer result1 = future1.get(); // 블로킹
Integer result2 = future2.get(); // 블로킹
그런데 굳이 요청을 처리하고, 요청 처리 결과를 반환받는 메서드를 각각 둔 이유는 무엇일까? 즉, submit()과 get() 메서드를 하나로 합치면 더 간편하지 않을까?
다음과 같이 submit() 메서드 결과로 요청 처리 결과가 바로 반환된다고 가정해보자.
Integer result1 = executorService.submit(task1); // 블로킹
Integer result2 = executorService.submit(task2); // 블로킹
그러면 task1 작업을 요청하고, 요청이 완료되어 결과 값이 반환될 때까지 요청 스레드는 블로킹된다. 그리고 요청 처리 결과가 반환된 후에 task2 작업을 요청하게 된다. 즉, 싱글 스레드처럼 동작하게 된다. 이러한 이유로 작업을 요청하는 메서드와 요청 처리 결과를 반환 받는 메서드가 각각 존재하는 것이다.
Future의 get() 메서드 사용 시 주의사항
get() 메서드는 블로킹 메서드이기 때문에 싱글 스레드처럼 동작하지 않게 주의해서 사용해야 한다.
하나의 작업을 처리하는데 2초가 걸린다고 가정해보자. 그러면 다음 코드는 총 2초가 걸린다.
Future<Integer> future1 = executorService.submit(task1); // 여기는 블로킹 아님
Future<Integer> future2 = executorService.submit(task2); // 여기는 블로킹 아님
Integer result1 = future1.get(); // 블로킹 (2초 소요)
Integer result2 = future2.get(); // 블로킹 (거의 바로 반환)
그런데 다음 코드는 총 4초가 걸리며 싱글 스레드처럼 동작한다.
Future<Integer> future1 = executorService.submit(task1); // 여기는 블로킹 아님
Integer result1 = future1.get(); // 블로킹 (2초 소요)
Future<Integer> future2 = executorService.submit(task2); // 여기는 블로킹 아님
Integer result2 = future2.get(); // 블로킹 (2초 소요)
Reference
- 김영한의 실전 자바 - 고급 1편, 멀티스레드와 동시성
'Language > Java' 카테고리의 다른 글
ExecutorService의 스레드 풀 관리 (1) | 2024.10.21 |
---|---|
ExecutorService의 우아한 종료 (0) | 2024.10.21 |
인터럽트 (0) | 2024.10.14 |
자바에서 자식 메서드가 부모 메서드가 던지는 체크 예외의 하위 타입만 던질 수 있는 이유 (0) | 2024.10.14 |
스레드의 생명주기 (0) | 2024.10.14 |