본문 바로가기

Language/Java

ExecutorService의 스레드 풀 관리

ExecutorService의 스레드 풀 관리

ExecutorService의 기본 구현체인 ThreadPoolExecutor의 생성자는 다음 속성들을 사용한다.

  • corePoolSize: 스레드 풀에서 관리되는 기본 스레드 수
  • maximumPoolSize: 스레드 풀에서 관리되는 최대 스레드 수
  • keepAliveTime, TimeUnit unit: 기본 스레드 수를 초과해서 만들어진 초과 스레드가 생존할 수 있는 대기 시간, 이 시간 동안 처리하는 작업이 없으면 초과 스레드는 제거된다.
  • BlockingQueue workQueue: 작업을 보관할 블로킹 큐

corePoolSize와 maximumPoolSize의 차이를 알아보기 위해 다음 예제를 살펴보자.

 

printState() 메서드는 스레드 풀 관련 정보를 확인하기 위한 메서드이다.

public abstract class ExecutorUtils {

    public static void printState(ExecutorService executorService) {
        if (executorService instanceof ThreadPoolExecutor poolExecutor) {
            int pool = poolExecutor.getPoolSize();
            int active = poolExecutor.getActiveCount();
            int queuedTasks = poolExecutor.getQueue().size();
            long completedTasks= poolExecutor.getCompletedTaskCount();
            log("[pool = " + pool + ", active = " + active +
                    ", queuedTasks = " + queuedTasks + ", completedTasks = " + completedTasks + "]");
        } else {
            log(executorService);
        }
    }
    
    public static void printState(ExecutorService executorService, String taskName) {
        if (executorService instanceof ThreadPoolExecutor poolExecutor) {
            int pool = poolExecutor.getPoolSize();
            int active = poolExecutor.getActiveCount();
            int queuedTasks = poolExecutor.getQueue().size();
            long completedTasks= poolExecutor.getCompletedTaskCount();
            log(taskName + " -> [pool = " + pool + ", active = " + active +
                    ", queuedTasks = " + queuedTasks + ", completedTasks = " + completedTasks);
        } else {
            log(executorService);
        }
    }
}

 

그리고 다음과 같이 ThreadPoolExecutor를 생성하는데, 생성자를 통해 corePoolSize = 2, maximumPoolSize = 2, 큐 사이즈는 2로 설정해준다.

public class Example {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
        ExecutorService es = new ThreadPoolExecutor(2, 4, 3000, TimeUnit.MILLISECONDS, workQueue);

        for (int i = 1; i <= 10; i++) {
            es.execute(new RunnableTask("task" + i));
            printState(es, "task" + i);
        }
    }

    static class RunnableTask implements Runnable {

        private final String name;
        private int sleepMs = 1000;

        public RunnableTask(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            log(name + " 시작");
            sleep(sleepMs);
            log(name + " 완료");
        }
    }
}

 

main 메서드 실행 결과, 출력 값은 다음과 같다.

  • 작업 1, 2
    • Executor는 스레드 풀에 스레드가 core 사이즈만큼 있는지 확인한다.
    • core 사이즈만큼 없으므로 스레드를 생성한다.
    • 작업을 처리하기 위해 스레드를 하나 생성했기 때문에 작업을 큐에 넣을 필요 없이, 해당 스레드가 바로 작업을 처리한다.
    • 새로 생성된 스레드 1이 작업 1을 처리하고, 스레드 2가 작업 2를 처리한다.

  • 작업 3, 4
    • Executor는 스레드 풀에 스레드가 core 사이즈만큼 있는지 확인한다. 
    • core 사이즈만큼 이미 스레드가 만들어져 있고, 스레드 풀에 유휴 스레드가 없으므로 이 작업을 큐에 보관한다.

  • 작업 5
    • Executor는 스레드 풀에 스레드가 core 사이즈만큼 있는지 확인한다.
    • core 사이즈만큼 이미 스레드가 만들어져 있고, 스레드 풀에 유휴 스레드가 없으므로 작업을 큐에 보관하려고 한다.
    • 그러나 큐가 가득 찼으므로 Executor는 max 사이즈까지 초과 스레드를 만든다. (core = 2, max = 4이므로 기본 스레드 2개에 초과 스레드를 2개 만들 수 있다.)
    • Executor는 초과 스레드인 스레드 3을 생성한다. 즉, 스레드 3이 작업 5를 처리한다.

  • 작업 6
    • 여전히 큐가 가득 찼으므로, Executor는 초과 스레드인 스레드 4를 생성하고, 스레드 4가 작업 6을 처리한다.

  • 작업 7
    • 큐가 가득 찼고, 스레드도 max까지 초과 스레드가 생성됐다. 
    • 따라서 이 작업을 큐에 넣을 수도 없고, 작업을 수행할 초과 스레드도 생성할 수 없으므로, 작업을 거절한다. (RejectedExecutionException 발생)

 

정리하면 다음과 같다.

  • 작업을 요청하면 core 사이즈만큼 스레드를 생성한다.
  • 그리고 core 사이즈를 초과하면 큐에 작업을 넣는다.
  • 큐가 가득 차서 작업을 저장할 공간이 없으면, max 사이즈만큼 초과 스레드를 생성한다.
  • 큐도 가득 차고 max 사이즈만큼 초과 스레드를 생성했음에도 들어온 작업은 거절된다.

스레드 미리 생성하기

참고로 응답 시간이 아주 중요해서, 서버가 고객의 요청을 받기 전에 미리 스레드 풀에 스레드를 생성하고 싶으면 ThreadPoolExecutor.prestartAllCoreThreads()를 사용하면 된다. 다음과 같이 사용하면, 기본 스레드가 처음에 미리 생성된다.

public class Example {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
        ExecutorService es = new ThreadPoolExecutor(2, 4, 3000, TimeUnit.MILLISECONDS, workQueue);
        printState(es);

        ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) es;
        poolExecutor.prestartAllCoreThreads();
        printState(es);
    }
}

Executors가 제공하는 스레드 풀 전략

앞서 ThreadPoolExecutor 생성자를 통해 corePoolSize, maximumPoolSize, keepAliveTime, workQueue 등을 설정할 수 있고, 각각의 설정 요소들이 의미하는 바를 알아봤다.

 

그런데 Executors 클래스는 다음 메서드들을 통해 3가지 기본 전략을 제공한다. 따라서 new ThreadPoolExecutor()와 같이 직접 생성자를 사용할 필요 없이 다음 메서드들을 사용하면 된다.

  • newSingleThreadPool(): 단일 스레드 풀 전략
  • newFixedThreadPool(nThreads): 고정 스레드 풀 전략
  • newCachedThreadPool(): 캐시 스레드 풀 전략

newSingleThreadPool: 단일 스레드 풀 전략

new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
  • 기본 스레드 1개만 사용한다.
  • 큐 사이즈에 제한이 없다(LinkedBlockingQueue). 따라서 초과 스레드는 생성하지 않는다.
  • 주로 간단히 사용하거나 테스트 용도로 사용한다.

newFixedThreadPool(nThreads): 고정 스레드 풀 전략

return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
  • 스레드 풀에 nThreads만큼의 기본 스레드를 생성한다. 
  • 큐 사이즈에 제한이 없다. 따라서 초과 스레드는 생성하지 않는다.
  • 스레드 수가 고정되어 있기 때문에 CPU, 메모리 리소스가 어느정도 예측 가능한 안정적인 방식이다. 따라서 일반적인 상황에 가장 안정적으로 서비스를 운영할 수 있다.
  • 그러나 갑작스럽게 요청이 몰릴 경우, 적합하지 않을 수 있다. 스레드 수가 고정되어 있기 때문에 요청이 몰리더라도 작업 큐에 요청이 저장되고 큐 사이즈는 무한이므로 작업이 계속 큐에 쌓인다. 그리고 일정 갯수의 스레드에 의해 순차적으로 작업이 처리된다. 따라서 CPU와 메모리 사용량은 여유가 있는데 사용자의 요청 처리만 느려지는 문제가 발생한다.

newCachedThreadPool: 캐시 스레드 풀 전략

 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
  • 기본 스레드를 사용하지 않고, 60초의 생존 주기를 가진 초과 스레드만 사용한다. 그리고 초과 스레드의 수는 제한이 없다.
  • 작업 큐는 SynchronousQueue이다.
    • SynchronousQueue는 BlockingQueue 인터페이스의 구현체 중 하나로, 이 큐는 내부에 저장 공간이 없다.
    • 따라서 작업이 큐에 저장되는 것이 아니라, 생산자의 작업을 소비자 스레드에게 직접 전달한다.
    • 쉽게 이야기해서 저장 공간의 크기가 0이고, 생산자 스레드가 큐에 작업을 전달하면 소비자 스레드가 큐에서 작업을 꺼낼 때까지 생산자 스레드는 대기한다. 반대로, 소비자 스레드가 작업을 요청하면 생산자 스레드가 작업을 전달할 때까지 소비자 스레드는 대기한다.
    • 즉, 이름 그대로 생산자와 소비자를 동기화하는 큐이다.
  • 초과 스레드 생성 과정은 다음과 같다.
    1. 작업을 요청하면 core 사이즈만큼 스레드를 만드는데, core 사이즈가 0이므로, 바로 core 사이즈를 초과한다.
    2. core 사이즈를 초과하면 큐에 작업을 넣는데, 큐에 작업을 넣을 수 없다.
    3. 따라서 바로 초과 스레드가 생성된다. 그리고 풀에 대기 중인 초과 스레드가 있다면 해당 스레드를 재사용한다.
    4. 그리고 초과 스레드 수는 무제한이므로, 요청이 거절될 일이 없고 초과 스레드가 무제한으로 생성된다.
    5. 그리고 생성된 초과 스레드가 60초 간 작업을 처리하지 않으면 해당 스레드는 제거된다.
  • 이 전략은 기본 스레드도 없고, 대기 큐에 작업이 쌓이지도 않는다. 대신 작업 요청이 오면 바로 초과 스레드로 처리한다. 그리고 초과 스레드의 수에 제한이 없기 때문에 CPU, 메모리 자원만 허용한다면 시스템 자원을 최대로 사용하며 작업을 즉시 처리할 수 있다. 즉, 요청이 갑자기 증가하면 스레드 수가 증가하고, 요청이 줄어들면 스레드도 줄어드는 매우 유연한 전략이다.
  • 그러나 갑작스런 요청 증가로 초과 스레드 수가 무수히 많이 생성되고, 이로 인해 CPU 사용량과 메모리 사용량이 지나치게 높아지며 시스템 장애가 발생할 수 있다. 즉, 서버 자원을 최대한 사용하며 요청을 즉각적으로 처리할 수 있는 방법이지만, 자원이 감당할 수 있는 임계점을 넘는 순간 장애가 발생할 수 있다.

사용자 정의 풀 전략

고정 스레드 풀 전략은 시스템 자원을 잘 활용하지 못하며 요청의 즉각적인 처리가 불가능했고, 캐시 스레드 풀 전략은 시스템 자원을 최대로 활용하며 요청의 즉각적인 처리가 가능했지만 요청이 몰릴 경우 시스템 자원을 최대로 사용하여 자원 사용량을 예측할 수 없고 장애로 이어질 수 있다.

 

따라서 사용자 정의를 통해 위와 같은 문제들을 해결할 수 있다. 예를 들어, 다음과 같이 정의한다고 가정해보자.

ExecutorService es = new ThreadPoolExecutor(100, 200, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));
  • core 사이즈는 100, max 사이즈는 200이고 큐 사이즈를 1000으로 제한했다.
  • 따라서 큐가 가득 차고, 초과 스레드까지 모두 생성된 경우, 그 이후로 들어온 작업은 거절된다. (예외 정책에 따라

    RejectedExecutionException이 발생하거나, 작업이 거절되거나, 작업 요청을 한 스레드가 대신 작업을 실행한다.)

참고로 다음과 같이 설정하면 안된다. LinkedBlockingQueue를 무한대의 사이즈로 사용하기 때문에 큐가 가득찰 수가 없다. 따라서 초과 스레드가 생성될 가능성이 없으며, 기본 스레드 100개만으로 무한대의 작업을 처리해야 하는 문제가 발생한다.

ExecutorService es = new ThreadPoolExecutor(100, 200, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

Executor 예외 정책

큐가 가득 차고, 초과 스레드까지 모두 생성된 경우, 그 이후로 들어온 작업을 어떻게 처리할지를 알아보자. ThreadPoolExecutor는 작업을 거절하는 다양한 정책을 제공한다.

  • AbortPolicy: 새로운 작업을 제출할 때 RejectedExecutionException을 발생시킨다. (기본 정책)
  • DiscardPolicy: 새로운 작업을 조용히 버린다.
  • CallerRunsPolicy: 새로운 작업을 제출한 스레드가 대신해서 직접 작업을 실행한다.
  • 사용자 정의(RejectedExecutionHandler): 개발자가 직접 정의한 거절 정책을 사용할 수 있다.

참고로 ThreadPoolExecutorshutdown()하면 이후에 요청하는 작업을 거절하는데, 이때도 같은 정책이 적용된다.

 

기본 정책은 AbortPolicy이며, new ThreadPoolExcutor() 생성자를 통해 정책을 설정할 수 있다.

public class ThreadPoolExecutor extends AbstractExecutorService {

    private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); // 기본 정책
    
    
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);
    }
    
    ...
}

 

이러한 정책들은 각각 RejectedExecutionHandler의 구현체이다.

AbortPolicy

RejectedExecutionHandler 구현체 중 하나인 AbortPolicy를 보면, rejectedExecution() 메서드를 재정의하고 있다. 그리고 해당 메서드 내에서 RejectedExecutionException 예외를 발생시킨다.

DiscardPolicy

DiscardPolicy를 보면 rejectedExecution() 메서드 내에서 아무런 작업도 하지 않는다. 즉, 작업을 조용히 버린다.

CallerRunsPolicy

CallerRunsPolicy는 rejectedExecution() 메서드 내부에서 r.run() 메서드가 호출된다. 즉, 작업을 요청한 스레드가 해당 작업을 직업 수행한다. 그리고 executorService.shutdown()이 호출된 경우에는 해당 작업이 수행되면 안되므로 shutdown이 아닌 경우에만 작업을 수행한다.

사용자 정의

위와 같이 RejectedExecutionHandler의 구현체를 직접 정의하여 자신만의 거절 처리 정책을 정의할 수도 있다.


Reference

  • 김영한의 실전 자바 - 고급 1편, 멀티스레드와 동시성