동기화
여러 스레드가 동시에 공유 자원에 접근할 때, 접근 순서에 따라 결과 값이 달라지는 현상을 경쟁 상태(race condition)이라 한다. 이런 문제를 방지하고 데이터의 일관성과 무결성을 보장하기 위해 각 스레드가 공유 자원에 접근할 때 동시에 접근하지 못하도록 접근 순서를 제어하는 것을 동기화라고 한다. 그리고 이때 데이터 일관성을 보장하기 위해 하나의 프로세스만 접근해야 하는 영역을 임계 영역(critical section)이라 한다. 따라서 동기화는 경쟁 상태를 방지하고 여러 스레드가 임계 영역에 안전하게 접근할 수 있도록 실행 순서를 조정하는 것이다.
대표적인 동기화 기법으로 뮤텍스, 세마포어, 모니터가 있다.
뮤텍스(Mutex)
뮤텍스는 임계 영역에 하나의 스레드만 접근하도록 한다. 그리고 이를 제어하기 위해 락을 사용한다. 임계 영역에 접근한 스레드는 락을 가지고 있으며, 임계 영역을 빠져나온 후에는 락을 반납한다. 뮤텍스는 상호 배제(Mutual Exclusion)를 보장해, 하나의 임계 영역에 동시에 둘 이상의 스레드가 접근하지 못하게 한다.
임계영역에 접근하려는데 락을 획득하지 못한 스레드들은 락이 해제될 때까지 대기한다. 이때 구현 방식에 따라 두 가지로 나뉜다.
- 바쁜 대기(Busy Waiting)
- 무한루프에 빠져 스레드는 락을 획득할 수 있는지 지속적으로 확인한다. 따라서 CPU를 계속 사용한다.
- context switching이 발생하지 않아 대기 시간이 짧은 경우에는 성능이 좋을 수 있다.
- 그러나 대기 시간이 길어질수록 CPU 리소스를 낭비하게 되어 비효율적이다.
- 블로킹 대기
- 스레드가 대기 상태로 전환되어 대기 큐에 들어간다. 따라서 CPU를 점유하지 않는다. 이후에 락이 해제되면 OS가 스레드를 다시 깨운다.
- 락을 대기하는 스레드가 CPU를 점유하지 않기 때문에 효율적이다.
- 그러나 context switching 비용이 발생하여 매우 짧은 시간 동안만 대기하는 경우에는 비효율적일 수 있다.
세마포어(Semaphore)
세마포어는 뮤텍스와 유사하지만, 동시에 여러 스레드가 자원에 접근할 수 있는 스레드 수를 설정할 수 있다. 따라서 설정된 값만큼 여러 스레드가 동시에 임계 영역에 접근할 수 있다. 세마포어에서 사용 가능한 자원의 개수를 S라는 정수형 오브젝트로 표현한다.
S 값에 따라 세마포어의 종류가 두 가지로 나뉜다.
- 이진 세마포어: S 값이 0 또는 1이다. 즉, 한 번에 하나의 스레드만 임계 영역에 접근할 수 있으며, 뮤텍스와 유사하게 동작한다.
- 카운팅 세마포어: S 값이 0 이상의 정수 값을 가진다.
세마포어의 주요 연산은 다음과 같다.
- wait() (P 연산)
- 공유 자원 획득
- S가 0보다 크면, 자원을 사용할 수 있다는 의미로, P 연산을 통해 자원을 획득하고 S 값을 1 감소시킨다.
- S가 0이면, 자원이 사용 중이라는 의미이며 자원이 해제될 때까지 스레드는 대기한다.
- signal() (S 연산)
- 공유 자원 반납(락을 반납하는 과정)
- 자원을 해제하고 대기 중인 스레드가 있으면 그 스레드가 자원을 사용할 수 있다.
- S 값이 1 증가한다.
뮤텍스와 마찬가지로, 세마포어도 구현 방식에 따라 바쁜 대기 방식과 블로킹 방식으로 나뉠 수 있다.
- 바쁜 대기
- 세마포어 S 값을 1로 설정한다. 즉, 공유 자원이 하나의 프로세스만 접근할 수 있다.
- 스레드가 임계 영역에 접근하려고 할 때, S가 0 이하이면 자원이 없으므로 대기한다. 반면 S가 양수이면 자원을 획득한다.
- 만약 다른 프로세스가 이미 임계 영역에 접근한 상태여서 S의 값이 0이라면, 현재 프로세스는 CPU를 획득한 채로 while 문을 돌기만 한다(다음 프로세스가 임계 영역 수행을 마치고 V 연산을 하여 공유 데이터를 반납해야 해결되는 상황). 즉, CPU를 획득한 채로 대기만 하는 Busy Waiting 문제가 발생한다.
- 블로킹 방식
- Busy Waiting 문제가 발생하지 않는 구현 방법으로, 이 구현 방법에서 S는 세마포어 변수 value와 공유 자원을 기다리는 봉쇄 상태의 프로세스들이 저장된 큐를 갖는다.
- 공유 자원에 접근할 수 없는 프로세스는 CPU를 획득한 내내 대기하는 것이 아니라, 봉쇄 상태가 되어 공유 자원을 기다리는 큐에 줄을 서게 된다.
- 공유 자원을 사용한 프로세스가 공유 자원을 반납하면 대기 중인 프로세스에게 공유 자원이 넘어간다.
모니터(Monitor)
뮤텍스와 세마포어를 사용할 때, 락을 얻고 해제하는 것을 개발자가 직접 관리해야 한다. 따라서 코드가 복잡해지며 오류가 발생할 가능성이 높아진다. 예를 들어 락을 획득하고 해제하는 과정에서 교착 상태가 발생할 수 있다.
모니터는 이러한 문제점을 해결하기 위해 설계된 고수준 동기화 메커니즘이다. 모니터는 조건 변수(Condition Variable)를 사용하여 스레드가 특정 조건이 만족될 때까지 대기한다. 그리고 조건이 충족되면 대기 중이던 스레드가 깨워져서 자원을 사용할 수 있다.
자바에서 synchronized를 통한 동기화 메커니즘
동기화 기법인 뮤텍스, 세마포어, 모니터에 대해 개념적으로 살펴봤다. 이를 자바와 연관지어 살펴보자.
자바의 synchronized 키워드는 모니터 개념에 기반하여 구현된 동기화 메커니즘이다.
- synchronized는 메서드나 블록에 사용되어 해당 임계 영역에 대한 상호 배제를 보장한다. 락을 가진 스레드만 해당 영역을 실행할 수 있으며 다른 스레드는 락이 해제될 때까지 대기한다.
- 이때 개발자가 명시적으로 락을 관리할 필요가 없다. synchronized 영역에 접근할 때 알아서 락을 대기하고 획득한 후, 수행 후에는 알아서 락을 반납한다. 즉, synchronized는 개발자가 명시적으로 락을 관리할 필요 없이 자바가 자동으로 락을 관리해준다.
- synchronized에서 락을 획득하지 못한 스레드는 블로킹 상태로 대기한다. 따라서 CPU를 점유하지 않으므로 효율적이다.
- wait(), notify() 메서드는 조건 변수 역할을 하여, 스레드가 특정 조건을 기다리거나 조건 충족 시 깨어나도록 해준다.
정리하자면, synchronized는 개발자가 명시적으로 락을 관리할 필요 없이, 알아서 락을 대기하고 획득하고 반납한다는 점에서 모니터 개념에 기반하여 구현된 동기화 메커니즘이다. 그러나 synchronized 자체는 조건 변수가 아니다. 하지만 synchronized 불록 내에서 wait(), notify(), notifyAll() 메서드를 사용하여 조건 변수 개념을 활용할 수 있다.
조건 변수의 필요성
모니터에는 조건 변수라는 개념이 등장한다. 모니터는 조건 변수를 사용하여 스레드가 특정 조건이 만족될 때까지 대기한다. 그리고 조건이 충족되면 대기 중이던 스레드가 깨워져서 자원을 사용할 수 있다.
자바에서 Object 클래스의 메서드인 wait(), notify(), notifyAll() 같은 메서드가 조건 변수의 역할을 한다. 생산자-소비자 문제 예제를 통해 조건 변수의 필요성에 대해 알아보자.
생산자-소비자 문제
생산자-소비자 문제는 멀티스레드 프로그래밍에서 자주 등장하는 동시성 문제 중 하나로, 여러 스레드가 동시에 데이터를 생산하고 소비하는 상황을 다룬다.
- 생산자: 데이터를 생산하는 역할을 한다.
- 소비자: 생성된 데이터를 사용하는 역할을 한다.
- 버퍼: 생산자가 생성한 데이터를 일시적으로 저장하는 공간이다. 이 버퍼는 한정된 크기를 가지며, 생산자와 소비자는 이 버퍼를 통해 데이터를 주고받는다.
이러한 구조에서 발생할 수 있는 문제는 다음 두가지 경우가 있다.
- 생산자가 너무 빠를 때: 버퍼가 가득 차서 더이상 데이터를 넣을 수 없다. 생산자는 버퍼에 빈 공간이 생길 때까지, 즉 소비자가 버퍼의 데이터를 가져갈 때까지 기다린다.
- 소비자가 너무 빠를 때: 버퍼가 비어서 더이상 소비할 데이터가 없다. 소비자는 버퍼에 새로운 데이터가 생길 때까지, 즉 생산자가 버퍼에 데이터를 생산할 때까지 기다린다.
상황 가정
생산자와 소비자가 데이터를 생산하고 소비하는 공간인 버퍼를 만들고, 다음 상황을 가정해보자.
버퍼 사이즈가 2이고, 생산자 1, 2, 3이 데이터를 생산하고 이어서 소비자 1, 2, 3이 데이터를 소비한다고 가정하자. 참고로 이때 각 생산자와 소비자는 각각의 스레드이다.
- 생산자 1 → 생산자 2 → 생산자 3 → 소비자 1 → 소비자 2 → 소비자 3
BoundedQueue는 버퍼 역할을 하는 큐로, 인터페이스이다.
public interface BoundedQueue {
void put(String data) throws InterruptedException;
String take() throws InterruptedException;
}
- BoundedQueue: 한정된 사이즈를 갖는 버퍼 역할을 한다.
- put(data): 버퍼에 데이터를 보관한다. (생산)
- take(): 버퍼에 보관된 데이터를 가져간다. (소비)
예제 1: 조건 변수가 없을 때
public class BoundedQueueImpl implements BoundedQueue {
private final Queue<String> que = new ArrayDeque<>(); // 생산자와 소비자의 공유 자원
private final int max;
public BoundedQueueImpl(int max) {
this.max = max;
}
@Override
public synchronized void put(String data) throws InterruptedException { // 공유 자원에 접근하는 메서드이므로 synchronized 붙임
while (que.size() == max) {
System.out.println("[put] 큐가 가득 참, 생산자 대기");
Thread.sleep(1000);
}
que.offer(data);
}
@Override
public synchronized String take() throws InterruptedException { // 공유 자원에 접근하는 메서드이므로 synchronized 붙임
while (que.isEmpty()) {
System.out.println("[take] 큐에 데이터가 없음, 소비자 대기");
Thread.sleep(1000);
}
return que.poll();
}
}
- 여기서 핵심 공유 자원은 que이다. 따라서 큐에 한 번에 하나의 스레드만 접근하도록 큐에 접근하는 메서드인 put과 take에 synchronized를 붙여 안전한 임계 영역을 만들었다.
- 큐에 데이터가 꽉 차서 데이터를 생산할 수 없거나, 큐에 데이터가 없어서 데이터를 소비할 수 없을 경우에는 sleep()을 사용하여 잠시 대기하고, 깨어난 다음 다시 반복문을 통해 생산/소비가 가능한지 확인하는 식으로 구현했다.
위 예제 코드대로라면 다음과 같이 실행될 것이다.
- 생산자 1이 큐에 데이터를 생산한다.
- 생산자 2가 큐에 데이터를 생산한다.
- 생산자 3이 큐에 데이터를 생산하려는데, 큐가 꽉 찼으므로 큐에 빈 공간이 생길 때까지 대기한다. (즉, 소비자가 데이터를 소비할 때까지 대기한다.)
- 소비자 1이 데이터를 소비하려는데 take 메서드에는 synchronized가 붙어있다. 즉, 락을 획득해야 하는데 현재 생산자 3이 락을 점유하고 있으므로, 락을 획득할 수 있을 때까지 대기한다.
- 마찬가지로 소비자 2와 3도 락을 획득할 수 있을 때까지 대기한다.
즉, 각 스레드의 상태는 다음과 같다.
- 생산자 1, 2: 데이터 생산 후에는 TERMINATED 상태가 된다.
- 생산자 3: sleep() 메서드로 큐에 빈 공간이 생길 때까지 대기하므로 TIMED_WAITING 상태가 된다.
- 소비자 1, 2, 3: 락을 대기하므로 BLOCKED 상태가 된다.
큐가 꽉 차서 빈 공간이 생길 때까지 대기하거나, 또는 큐에 데이터가 생성될 때까지 대기하는데 대기하는 스레드가 락을 획득한 채로 대기하기 때문에 다른 스레드가 큐에 데이터를 소비하거나 생산하지 못하는 문제가 발생한다.
락을 가지고 있는 임계 영역에 접근하고 있는 스레드가 sleep()으로 대기할 때는 아무 일도 하지 못한다. 즉, 굳이 락을 가지고 대기할 필요가 없다. 따라서 대기하는 동안 락을 반납해 다른 스레드가 임계 영역에 접근 가능하게 하고, 이를 통해 큐에 빈 공간이 생기거나 데이터가 생성되어 대기하던 스레드가 대기를 멈추고 다음 작업을 수행하도록 해야 한다. 즉, 락을 가지고 대기하는 스레드가 대기하는 동안 락을 다른 스레드에게 양보해야 한다.
자바에서 조건 변수 역할을 하는 wait()를 통해 락을 양보한 채로 대기할 수 있고 notify()를 통해 대기 중인 스레드를 깨워 락을 획득하게 할 수 있다.
예제 2: 조건 변수(wait(), notify() )도입
대기하는 스레드는 락을 가지고 있지 않고, 다른 스레드에게 양보하자. 자바의 Object.wait()과 Object.notify()를 사용하면 대기하는 동안 락을 다른 스레드에게 양보할 수 있다.
- wait(): 현재 스레드가 가진 락을 반납하고 대기한다. 이 메서드는 현재 스레드가 synchronized 영역 내에서 락을 소유하고 있을 때만 호출할 수 있다. 메서드를 호출한 스레드는 락을 반납하고, 다른 스레드가 락을 획득할 수 있도록 한다. 호출한 스레드는 대기 상태(WAITING)로 전환되고, 다른 스레드가 notify() 또는 notifyAll()을 호출할 때까지 대기한다.
- notify(): 대기 중인 스레드 중 하나를 깨운다. 이 메서드는 synchronized 영역 내에서 호출되어야 한다. 깨워진 하나의 스레드는 락을 획득할 기회를 얻게 된다.
- notifyAll(): 대기 중인 모든 스레드를 깨운다. 이 메서드는 synchronized 영역 내에서 호출되어야 한다. 모든 대기 중인 스레드가 락을 획득할 기회를 얻게 된다.
public class BoundedQueueImpl implements BoundedQueue {
private final Queue<String> que = new ArrayDeque<>(); // 생산자와 소비자의 공유 자원
private final int max;
public BoundedQueueImpl(int max) {
this.max = max;
}
@Override
public synchronized void put(String data) {
while (que.size() == max) {
System.out.println("[put] 큐가 가득 참, 생산자 대기");
try {
wait(); // RUNNABLE -> WAITING, 락 반납
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
que.offer(data);
notify();
}
@Override
public synchronized String take() {
while (que.isEmpty()) {
System.out.println("[take] 큐에 데이터가 없음, 소비자 대기");
try {
wait(); // RUNNABLE -> WAITING, 락 반납
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String data = que.poll();
notify();
return data;
}
}
- put() 메서드
- 큐에 빈 공간이 없으면, 생산자 스레드는 wait()을 사용해서 대기한다. 이때 락을 반납하고 대기한다. (생산자 스레드는 RUNNABLE에서 WAITING 상태가 된다)
- 소비자 스레드가 데이터를 소비하고 notify()를 호출한다. 따라서 소비자가 take() 메서드를 수행한 후에 대기 중이었던 생산자 스레드가 깨어날 수 있다.
- take() 메서드
- 큐에 데이터가 없으면, 소비자 스레드는 wait()을 사용해서 대기한다. 이때 락을 반납하고 대기한다. (소비자 스레드는 RUNNABLE에서 WAITING 상태가 된다)
- 생산자 스레드가 데이터를 생산하고 notify()를 호출한다. 따라서 생산자가 put() 메서드를 수행한 후에 대기 중이었던 소비자 스레드가 깨어날 수 있다.
wati()과 notify()를 통해 앞서 발생했던 문제가 해결되었다. 그러나 여전히 비효율적인 문제가 남아있다.
스레드가 wait()을 호출하여 대기 상태로 들어가면, 대기 중인 스레드는 어떤 스레드 대기 집합에서 대기하게 된다. 그리고 어떤 스레드가 notify()를 호출하면, 대기 집합에서 대기 중인 스레드 중 하나의 스레드가 깨워진다. 그런데 대기 집합에는 생산자 스레드와 소비자 스레드가 함께 섞여 있고, 어떤 종류의 스레드가 깨워질지 알 수 없다. 따라서 다음과 같은 비효율이 발생할 수 있다.
- 상황 1: notify()
- 소비자 스레드가 큐에 있는 마지막 데이터를 소비하고 큐에 데이터가 없는 상태이다. 그리고 notify()를 호출했다.
- 이때 생산자 스레드가 깨워지면 베스트이다. 그러나 소비자 스레드가 깨워졌다면, 큐에 데이터가 없으므로 다시 wait()을 통해 대기하게 된다.
- 상황 2: notifyAll()
- 소비자 스레드가 큐에 있는 마지막 데이터를 소비하고 큐에 데이터가 없는 상태이다. 그리고 notifyAll()를 호출했다.
- 대기 집합에 있는 모든 스레드가 깨어난다. 깨어난 스레드들 중 가장 먼저 락을 획득한 스레드만 임계 영역에 접근할 수 있고, 나머지 스레드들은 BLOCKED 상태가 되어 락을 대기한다.
- 만약 가장 먼저 락을 획득한 스레드가 소비자 스레드라면, 큐에 데이터가 없으므로 다시 대기 집합에 들어간다.
즉, 특정 스레드를 깨우고 싶지만, notify()와 notifyAll()은 어떤 스레드가 임계 영역에 들어가 로직을 수행할지 알 수 없기 때문에 비효율적인 상황이 발생할 수 있는 것이다.
예제 3: Condition 도입을 통해 비효율 해결
위 문제를 해결해주는 것이 자바의 Condition이다. Condition은 wait(), notify() 보다 더 세밀한 조건 제어를 가능하게 한다.
생산자 스레드는 데이터를 생산한 후에 대기 중인 소비자 스레드를 깨워야 한다. 그리고 소비자 스레드는 데이터를 소비한 후에 대기 중인 생산자 스레드를 깨워야 한다. 결국 생산자 스레드가 대기하는 집합과 소비자 스레드가 대기하는 집합을 나누고, 상황에 맞게 적절한 집합에 notify를 하면 된다.
public class BoundedQueueImpl implements BoundedQueue {
private final Lock lock = new ReentrantLock();
private final Condition producerCondition = lock.newCondition(); // 생산자 스레드를 위한 대기 공간
private final Condition consumerCondition = lock.newCondition(); // 소비자 스레드를 위한 대기 공간
private final Queue<String> que = new ArrayDeque<>(); // 생산자와 소비자의 공유 자원
private final int max;
public BoundedQueueImpl(int max) {
this.max = max;
}
@Override
public void put(String data) {
lock.lock(); // synchronized 대신 Lock으로 락 획득
try {
while (que.size() == max) {
System.out.println("[put] 큐가 가득 참, 생산자 대기");
try {
producerCondition.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
que.offer(data);
consumerCondition.signal();
} finally {
lock.unlock(); // 락 해제
}
}
@Override
public String take() {
lock.lock(); // 락 획득
try {
while (que.isEmpty()) {
System.out.println("[take] 큐에 데이터가 없음, 소비자 대기");
try {
consumerCondition.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String data = que.poll();
producerCondition.signal();
return data;
} finally {
lock.unlock(); // 락 해제
}
}
}
- Condition을 사용하기 위해 앞서 synchronized를 사용하던 방식에서 Lock과 ReentrantLock을 사용하는 방식으로 변경했다.
- Condition
- ReentrantLock을 사용하는 스레드가 대기하는 스레드 대기 공간이다. 참고로 Object.wait()에서 사용한 스레드 대기 공간은 모든 객체 인스턴스가 내부에 기본으로 가지고 있다. 반면에 Lock(ReentrantLock)을 사용하는 경우 이렇게 스레드 대기 공간을 직접 만들어 사용해야 한다.
- Condition 객체를 두 개 만들어서 생산자 스레드가 대기하는 공간과 소비자 스레드가 대기하는 공간을 구분했다.
- condition.await()
- Object.wait()과 유사한 기능이다. 지정한 condition에서 스레드가 WAITING 상태로 대기한다.
- condition.signal()
- Object.notify()와 유사한 기능이다. 지정한 condition에서 대기 중인 스레드 중 하나를 깨운다.
- 생산자 스레드가 작업을 처리한 후에는 소비자 스레드가 대기 중인 condition에 signal()을 호출하고, 소비자 스레드가 작업을 처리한 후에는 생산자 스레드가 대기 중인 condition에 signal()을 호출했다.
이렇게 생산자 스레드와 소비자 스레드가 대기하는 공간을 분리함으로써 앞서 대기 중인 스레드를 깨울 때 비효율적인 문제를 해결할 수 있게 되었다.
java.util.concurrent.BlockingQueue
앞서 만든 큐는 BlockingQueue와 매우 유사하다. BlockingQueue는 인터페이스이고 대표적인 구현체는 다음과 같다.
- ArrayBlockingQueue: 배열 기반으로 구현되어 있고, 버퍼의 크기가 고정되어 있다.
- LinkedBlockingQueue: 링크 기반으로 구현되어 있고, 버퍼의 크기를 고정할 수도 있고 또는 무한하게 사용할 수도 있다.
ArrayBlockingQueue.put() 코드를 확인해보면, 앞서 직접 만든 큐와 매우 유사한 것을 알 수 있다.
public class ArrayBlockingQueue {
final Object[] items;
int count;
ReentrantLock lock;
Condition notEmpty; //소비자 스레드가 대기하는 condition
Condition notFull; //생산자 스레드가 대기하는 condition
public void put(E e) throws InterruptedException {
lock.lockInterruptibly(); // 락 획득
try {
while (count == items.length) {
notFull.await(); // 생산자 스레드 대기
}
enqueue(e);
} finally {
lock.unlock(); // 락 해제
}
}
private void enqueue(E e) {
items[putIndex] = e;
count++;
notEmpty.signal(); // 소비자 스레드에게 알림
}
}
BlockingQueue를 사용하여 기존 코드를 변경하면 다음과 같다.
public class BoundedQueueImpl implements BoundedQueue {
private BlockingQueue<String> que;
public BoundedQueueImpl(int max) {
que = new ArrayBlockingQueue<>(max);
}
@Override
public void put(String data) {
try {
que.put(data);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public String take() {
try {
return que.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Reference
- 김영한의 실전 자바 - 고급 1편, 멀티스레드와 동시성
'Language > Java' 카테고리의 다른 글
자바의 동기화 메커니즘(volatile, synchronized, Lock(ReentrantLock), Atomic/CAS, 동기화 컬렉션) (0) | 2024.10.25 |
---|---|
ExecutorService의 스레드 풀 관리 (1) | 2024.10.21 |
ExecutorService의 우아한 종료 (0) | 2024.10.21 |
Executor 프레임워크(ExecutorService, Callable, Future) (0) | 2024.10.18 |
인터럽트 (0) | 2024.10.14 |