Backend/RabbitMQ

RabbitMQ

olsohee 2024. 5. 2. 11:40

메시지 브로커가 필요한 이유

RabbitMQ는 AMQP(Advanced Message Queuing Protocol)라는 표준 프로토콜을 사용하는 메시지 브로커로, 송신자로부터 전달받은 메시지를 수신자로 전달해주는 중간 역할을 한다. 즉, 메시지 큐를 관리하고 애플리케이션 간에 메시지를 송수신할 수 있게 해준다.

 

그런데 이러한 메시지 브로커가 필요한 이유가 뭘까? 즉, 애플리케이션 간에 직접적인 통신이 아니라, 굳이 중간에 메시지 브로커를 두는 이유가 뭘까?

  • 비동기 처리를 통해 애플리케이션 간 결합도를 낮출 수 있다.
    • 만약 메시지 브로커를 사용하지 않고 송신 측과 수신 측이 직접적으로 통신한다면 어떻게 될까? 동기적 통신의 경우, 송신 측은 수신 측에게 작업을 요청하고 요청에 대한 응답이 돌아올 때까지 블로킹 상태가 된다. 또한 수신 측에 장애가 발생했을 경우 송신 측까지 장애가 전파된다.
    • 반면, 메시지 브로커를 사용하면 비동기적 통신이 가능하다. 따라서 즉시 응답을 받을 필요가 없는, 백그라운드에서 처리되어도 되는 작업들을 메시지 브로커를 통해 비동기적으로 처리하면, 작업을 요청하는 송신 측은 작업 처리 시간동안 블로킹되어 있지 않을 수 있어 더욱 효율적이다. 게다가 메시지를 보내는 이는 메시지를 받을 이의 주소를 몰라도 되어 두 애플리케이션 간 결합도가 낮아진다. 
  • 메시지 전달을 보장한다.
    • 그런데 @Async를 사용하는 등 메시지 브로커를 사용하지 않고 비동기적으로 작업을 요청할 수 있는데? 그리고 수신 측에 장애가 발생했을 경우 이에 대한 예외 처리가 송신 측에 되어 있으면 괜찮지 않을까? 그러나 수신 측 장애에 대한 예외 처리가 되어 있다고 하더라도, 비동기적으로 보낸 요청이 사라지는 문제가 있다. 즉, 메시지의 전달이 보장되지 않는 문제가 발생한다. 죽었던 수신 측의 서버가 되살아나더라도 이미 전에 왔던 요청들은 사라진 후이다.
    • 반면, 메시지 브로커를 사용하면 수신 측의 서버가 되살아났을 때 큐에 쌓인 메시지를 읽을 수 있다.

RabbitMQ의 메시지 전달 방식

 

RabbitMQ에서는 Producer, Consumer, Exchange, Queue 등의 개념이 등장한다. ProducerQueue에 직접적으로 연결되지 않고 Exchange를 거쳐 Queue에 연결된다.

 

Exchange(교환기)는 메시지를 받아서 라우팅 규칙에 따라 적절한 Queue로 라우팅하는 역할을 한다. Queue(큐)는 메시지를 저장하는 버퍼 역할을 한다. Producer가 보낸 메시지가 큐에 저장되고 Consumer가 큐에서 메시지를 가져가 처리한다.

 

라우팅 규칙은 다음과 같이 Direct, Fanout, Topic, Header 4가지가 있다.

  • Direct Exchange: 메시지에 포함된 라우팅 키와 일치하는 큐로 메시지를 전달한다.

 

  • Fanout Exchange: 라우팅 키와 상관없이 Exchange와 연결된 모든 큐에게 메시지를 전달한다. 

 

  • Topic Exchange: 라우팅 키의 패턴 매칭을 통해 메시지를 전달한다. 즉, 라우팅 키 문자열 내 *, #를 삽입시켜 Direct보다 좀 더 유연한 바인딩이 가능하다.

 

  • Header Exchange: 라우팅 키 대신 메시지 헤더를 기반으로 라우팅한다.
    • producer 측에서 정의하는 헤더는 message와 함께 publish 하며, consumer 측에서는 교환기와 큐를 바인딩하는 시점에 argument를 정의한다. (header 타입으로 라우팅 키를 설정해도 무시된다.)
    • 이때 x-match 키에 대한 값은 다음과 같다.
      • all: header의 모든 key-value 쌍 값과 argument의 모든 key-value 쌍 값이 일치해야 바인딩 된다.
      • any: argument의 key-value 쌍 값 중 하나라도 header의 key-value 쌍 값과 일치하는 쌍이 존재하면 바인딩 된다.

 

그런데 문득 교환기와 큐가 별도로 존재하는 이유가 궁금해졌다. 교환기와 큐가 분리되어 있을 때 얻는 이점이 무엇일까?

  • 유연한 라우팅: 교환기는 라우팅 전략에 따라 메시지를 하나 이상의 큐에 전달할 수 있다. 따라서 특정 메시지는 모든 소비자에게 브로드캐스트할 수 있고(Fanout), 또 특정 메시지는 조건에 맞는 소비자에게만 전달할 수도 있다(Topic).
  • 확장성: 새로운 큐를 추가하는 경우를 예로 들어보자. 만약 교환기 없이 큐만 존재한다면, 새롭게 큐를 추가했을 때 송신 측이 새로 추가된 큐로 메시지를 보내는 로직을 추가해야 한다. 그러나 교환기와 큐가 별도로 존재하면, 새롭게 큐를 추가했을 때 큐를 적절한 교환기와 바인딩만 하면 되고, 송신 측에는 변경 사항이 존재하지 않는다. 송신 측은 교환기로 메시지를 보내기만 할 뿐이며 교환기가 알아서 적절한 여러 큐들에게 메세지를 전송한다. 

예제

다음은 프로젝트에서 rabbitMQ를 사용했던 방식이다. 

 

우선 RabbitMQ 관리자 화면으로 접속해 "funfit"이라는 이름의 교환기를 생성했다. 이때 교환기의 타입을 direct로 지정했다.

 

그리고 "create_new_member"라는 이름의 큐를 생성하고, 해당 큐를 funfit 교환기와 바인딩해준다. 이때 라우팅 키는 큐 이름과 같게 설정했다.

 

이제 애플리케이션 코드를 보자. 송신 측은 다음과 같이 "funfit"이라는 교환기에 "create_new_member"라는 라우팅 키를 가지고 메시지를 보냈다.

@Slf4j
@Service
@RequiredArgsConstructor
public class RabbitMqService {

    private final RabbitTemplate rabbitTemplate;

    public void publishCreateNewMember(CreateNewMemberPubDto createNewMemberPubDto) {
        log.info("RabbitMQ | publish message, queue name = create_new_member, message = {}", createNewMemberPubDto.toString());
        rabbitTemplate.convertAndSend("funfit", "create_new_member", createNewMemberPubDto);
    }
}

 

funfit 교환기는 direct 타입이므로, 메시지를 받아 "메시지의 라우팅 키 == 큐의 라우팅 키"인 큐에 메시지를 보낼 것이다. 따라서  funfit 교환기에 도착한 메시지는, "create_new_member"라는 라우팅 키로 바인딩 된 create_new_member 큐로 전달될 것이다.

 

다음은 create_new_member 큐에 메시지가 도착한 모습이다.

 

이어서 수신 측은 다음과 같은 RabbitMQ 설정 파일을 통해 교환기를 설정했다.

@Configuration
public class RabbitMqConfig {

    @Value("${spring.rabbitmq.exchange}")
    private String exchange;

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
        rabbitTemplate.setExchange(exchange); // 교환기 설정
        return rabbitTemplate;
    }
}

 

그리고 다음과 같이 "crete_new_member"라는 이름의 큐를 구독하도록 설정했다. 따라서 해당 큐에 메시지가 도착하면 수신 측은 메시지를 전달받는다.

@Slf4j
@Service
@RequiredArgsConstructor
public class RabbitMqService {

    @RabbitListener(queues = "create_new_member")
    public void createRelationship(CreateNewMemberSubDto dto) {
        log.info("RabbitMQ | on message, queue name = create_new_member, message = {}", dto.toString());
    }
}

 

다음은 수신 측 애플리케이션 서버를 켰을 때 큐에 있던 1개의 메시지가 정상적으로 수신 측으로 전달된 것을 나타낸다.


Reference