0. 개요
이 글은 프로젝트를 진행하며 메시지 브로커로 Redis Stream을 활용한 경험을 작성한 글이다.
메인 서버(가챠 서버)에서 사용자가 특정 등급의 아이템을 모두 획득한 경우, 로또를 발급해준다. 이때 로또 발급 로직을 별도의 프로세스로 진행하고자 로또 서버를 생성했다. 그리고 메시지 브로커로 두 서버 간 통신을 이뤄 비동기 통신을 구현하고자 했다. Kafka, RabbitMQ 등의 메시지 브로커가 있는데, 기존에 Redis를 사용 중이었기 때문에 Redis를 활용했다. Redis의 pub/sub과 stream을 메시지 브로커로 사용할 수 있다.
1. Redis Stream
일반적으로 레디스를 이용해 메시지를 발행할 때는 pub/sub을 많이 사용한다. 하지만 이 방식은 메시지를 발행했을 때 수신자 서버가 장애 상태여서 메시지를 수신하지 못하면, 메시지는 휘발되고, 수신자 서버가 재시작했을 때 수신하지 못한 메시지를 뒤늦게 소비하는 것이 불가능하다. 또한 여러 개의 수신자 서버를 구동하면 발행자는 동일한 메시지를 여러 개 발행해야 하는 문제가 있다.

반면, 레디스 strem은 휘발성이 아니라 카프카 offset 개념처럼 마지막으로 수신한 record id를 저장하고 XADD, XREADGROUP, XACK, XPENDING, XCLAIM 등의 커맨드를 통해 메시지를 컨트롤할 수 있는 다양한 방법을 제공한다.
2. ACK + Pending Message 재처리
자동 ACK 모드
리스너 컨테이너를 등록할 때 'receiveAutoAck()'을 사용하면 자동 ACK 모드가 된다. 자동 ACK 모드로 설정한 상태에서, 메시지를 컨슘한 후에 메시지 처리 중에 에러가 발생해 결과적으로 메시지를 정상적으로 처리하지 못하면 어떻게 될까?
다음 예제는 자동 ACK 모드로 설정한 상태에서 메시지 처리 중 에러가 발생한 예제이다.
@Slf4j
@Component
@RequiredArgsConstructor
public class LottoMessageListener implements StreamListener<String, MapRecord<String, String, String> >, InitializingBean {
@Value("${spring.data.redis.stream.lotto-issuance-requests}")
private String streamKey;
private final String CONSUMER_GROUP_NAME = "default-group";
private final String CONSUMER_NAME = "default-name";
private final StreamMessageListenerContainer listenerContainer;
private final RedisTemplate<String, String> redisTemplate;
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.info("Redis Stream consume. stream = {}, message = {}", streamKey, message.toString());
try {
throw new RuntimeException();
} catch (Exception e) {
log.error("메시지 처리 중 예외 발생");
}
}
@Override
public void afterPropertiesSet() {
if (!Boolean.TRUE.equals(redisTemplate.hasKey(streamKey))) {
redisTemplate.opsForStream().createGroup(streamKey, ReadOffset.from("0"), CONSUMER_GROUP_NAME);
} else {
boolean groupExists = redisTemplate.opsForStream()
.groups(streamKey)
.stream()
.anyMatch(group -> group.groupName().equals(CONSUMER_GROUP_NAME));
if (!groupExists) {
redisTemplate.opsForStream().createGroup(streamKey, ReadOffset.latest(), CONSUMER_GROUP_NAME);
}
}
Consumer consumer = Consumer.from(CONSUMER_GROUP_NAME, CONSUMER_NAME);
StreamOffset<String> streamOffset = StreamOffset.create(streamKey, ReadOffset.lastConsumed());
listenerContainer.receiveAutoAck(consumer, streamOffset, this); // 자동 ACK
listenerContainer.start();
}
}
해당 서버는 에러가 발생했으며, redis cli로 접속해 lotto-issuance-requests 스트림의 default-group 소비자 그룹의 팬딩 리스트를 확인해보자.
> XPENDING lotto-issuance-requests default-group
1) (integer) 0
2) (nil)
3) (nil)
4) (nil)
ACK되지 않은 메시지가 0개, 즉 ACK된 것을 알 수 있다. 따라서 메시지를 정상적으로 처리하지 못했음에도 ACK이 자동으로 보내져 정합성 문제가 발생하게 된다.
수동 ACK 모드
이번에는 수동 ACK 모드로 설정해보자.
@Slf4j
@Component
@RequiredArgsConstructor
public class LottoMessageListener implements StreamListener<String, MapRecord<String, String, String> >, InitializingBean {
@Value("${spring.data.redis.stream.lotto-issuance-requests}")
private String streamKey;
private final String CONSUMER_GROUP_NAME = "default-group";
private final String CONSUMER_NAME = "default-name";
private final StreamMessageListenerContainer listenerContainer;
private final RedisTemplate<String, String> redisTemplate;
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.info("Redis Stream consume. stream = {}, message = {}", streamKey, message.toString());
try {
throw new RuntimeException();
} catch (Exception e) {
log.error("메시지 처리 중 예외 발생");
}
}
@Override
public void afterPropertiesSet() {
...
listenerContainer.receive(consumer, streamOffset, this); // 수동 ACK
...
}
}
ACK을 보내기 전에 예외가 발생했으므로 ACK되지 않은 1개의 메시지가 팬딩 리스트에 존재한다.
> XPENDING lotto-issuance-requests default-group
1) (integer) 1 # ACK되지 않은 메시지 개수
2) "1741325991200-0" # 가장 오래된 미처리 메시지 ID
3) "1741325991200-0" # 가장 최신 미처리 메시지 ID
4) 1) 1) "default-name" # 이 메시지를 소비한 소비자 이름
2) "1" # 해당 소비자가 ACK하지 않은 메시지 개수
따라서 수동 ACK 모드로 설정하여 메시지 처리가 완료되면 ACK을 보내도록 했다. 또한 처리 중 예외가 발생하여 팬딩 리스트에 남아있는 메시지들은 스케줄러를 통해 별도로 처리하도록 했다.
@Slf4j
@Component
@RequiredArgsConstructor
public class LottoMessageListener implements StreamListener<String, MapRecord<String, String, String> >, InitializingBean {
@Value("${spring.data.redis.stream.lotto-issuance-requests}")
private String streamKey;
private final String CONSUMER_GROUP_NAME = "default-group";
private final String CONSUMER_NAME = "default-name";
private final StreamMessageListenerContainer listenerContainer;
private final RedisTemplate<String, String> redisTemplate;
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.info("Redis Stream consume. stream = {}, message = {}", streamKey, message.toString());
try {
throw new RuntimeException();
} catch (Exception e) {
log.error("메시지 처리 중 예외 발생");
}
}
@Scheduled(fixedRate = 10000)
public void pendingMessageScheduler() {
log.info("Start pending message scheduler");
StreamOperations<String, Object, Object> streamOps = redisTemplate.opsForStream();
PendingMessagesSummary summary = streamOps.pending(streamKey, CONSUMER_GROUP_NAME);
long totalPendingMessagesCount = summary.getTotalPendingMessages();
if (totalPendingMessagesCount > 0) {
PendingMessages pendingMessages = streamOps.pending(streamKey, CONSUMER_GROUP_NAME, Range.closed("0", "+"), totalPendingMessagesCount);
List<RecordId> idList = pendingMessages.stream()
.map(pendingMessage -> pendingMessage.getId())
.toList();
RecordId[] array = idList.toArray(new RecordId[0]);
streamOps.claim(streamKey, CONSUMER_GROUP_NAME, CONSUMER_NAME, Duration.ofMillis(0), array)
.stream()
.forEach(message -> {
log.info("Retry pending message = {}", message.getValue());
// 메시지 재처리 후 ACK
redisTemplate.opsForStream().acknowledge(streamKey, CONSUMER_GROUP_NAME, message.getId());
});
}
}
...
}
스케줄러가 동작하면, 팬딩 메시지들이 재처리된 후 ACK된다. 따라서 redis cli를 통해 확인해보면 팬딩 메시지 내역이 모두 사라진 것을 볼 수 있다.
> XPENDING lotto-issuance-requests default-group
1) (integer) 0
2) (nil)
3) (nil)
4) (nil)
Reference
- https://dev.gmarket.com/113
- 개발자를 위한 레디스, 김가림, 에이콘
'Backend > Redis' 카테고리의 다른 글
레디스에서 다중 명령에 대한 원자성을 보장하는 방법 (0) | 2024.11.27 |
---|---|
센티널 로컬과 도커에 적용해보기 (1) | 2024.05.31 |
레디스 복제(Replication)와 센티널(Sentinel) (0) | 2024.05.30 |
프로젝트에서 레디스를 사용하며 한 고민들 (0) | 2024.05.24 |
스프링부트에서 Redis 사용하기 (0) | 2024.02.17 |