본문 바로가기

Backend/Kafka

카프카 오프셋 커밋과 메시지 손실, DLT

프로젝트에서 카프카를 사용 중인데 처음 사용해보는거라 아직 모르는 부분이 많다. 카프카를 사용해보며 들었던 의문들을 정리해보고자 한다.

자동 커밋 vs 수동 커밋

카프카의 자동 커밋(auto commit) 기능을 사용하면, 메시지를 소비한 후 일정 주기마다 오프셋이 자동으로 커밋된다. 이 과정에서 메시지를 가져온 후 처리하는 과정에 예외가 발생하더라도, 이미 오프셋이 커밋되었기 때문에 해당 메시지를 다시 소비할 수 없는 문제가 발생한다.

 

예제를 통해 살펴보자. 다음은 카프카 설저어 코드와 리스너 코드이다.

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "default-group");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaTemplate<String, String> kafkaTemplate)  {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

 

@Slf4j
@Component
public class KafkaConsumer {

    @KafkaListener(
            topics = "test-topic",
            groupId = "default-group",
            containerFactory = "kafkaListenerContainerFactory"
    )
    public void consume(String payload) {
        log.info("Kafka message consume. payload={}", payload);
        throw new RuntimeException();
    }
}

 

test-topic으로 메시지 1이 발행되었다.

 

그러나 소비자 서버에서 메시지 소비 후 메시지 처리 중 예외가 발생했다. 그러나 자동 커밋이므로 다음과 같이 CURRENT-OFFSET이 1로 설정되었다.

 

따라서 메시지 누락을 방지하기 위해 다음과 같이 수동 커밋 모드를 사용하자.

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "default-group");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 추가
        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaTemplate<String, String> kafkaTemplate)  {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); // 추가
        return factory;
    }
}

 

수동 커밋에서는 메시지 처리 후 직접 커밋해줘야 한다.

@Slf4j
@Component
public class KafkaConsumer {

    @KafkaListener(
            topics = "test-topic",
            groupId = "default-group",
            containerFactory = "kafkaListenerContainerFactory"
    )
    public void consume(String payload, Acknowledgment acknowledgment) {
        log.info("Kafka message consume. payload={}", payload);
        acknowledgment.acknowledge(); // 커밋
    }
}

오프셋 커밋에 실패해 lag이 쌓인 경우

상황 1. 커밋 실패 → 실패 

메시지 1을 소비하다가 예외가 발생해 커밋에 실패했다. 따라서 다음과 같이 CURRENT-OFFSET이 0이고, LAG이 1이다.

 

이어서 메시지 2를 발행할 때, 소비자 서버는 메시지 2를 소비한다. 그러나 예외가 발생해 커밋에 실패했다. 따라서 다음과 같이 CURRENT-OFFSET이 0이고, LAG이 2이다.

 

참고로 이때 스프링 로그를 확인해보면, CURRENT-OFFSET에 따라 메시지 1부터 소비한 후, 메시지 2를 소비할 것이라고 생각했는데, 메시지 2만 소비한다. 해당 블로그 글을 보면, 컨슈머 서버 내부에서 관리하는 오프셋이 따로 있어서, 이를 기반으로 poll하기 때문에 그렇다고 한다. 

 

이때 스프링 서버를 종료 후 재시작하면, 메시지 1, 2를 순차적으로 재소비를 시도한다.

상황 2. 커밋 실패 → 성공

이번에는 커밋 실패 후 이어서 커밋에 성공하는 예제를 살펴보자. 랜덤으로 실패와 성공을 해야하므로 다음과 같이 코드를 작성해줬다.

@Slf4j
@Component
public class KafkaConsumer {

    @KafkaListener(
            topics = "test-topic",
            groupId = "default-group",
            containerFactory = "kafkaListenerContainerFactory"
    )
    public void consume(String payload, Acknowledgment acknowledgment) {
        log.info("Kafka message consume. payload={}", payload);
        int randomNumber = new Random().nextInt(2);
        if (randomNumber == 0) {
            log.info("커밋 성공!");
            acknowledgment.acknowledge();
        } else {
            log.info("예외 발생, 커밋 실패!");
            throw new RuntimeException();
        }
    }

 

이때 기본적으로 커밋에 실패하면 메시지 재소비가 9번 일어난다. 예제를 위해 재소비를 하지 않도록, 다음과 같이 FixedBackOff를 설정해줬다.

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaTemplate<String, String> kafkaTemplate)  {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        
        FixedBackOff fixedBackOff = new FixedBackOff(0, 1); // 추가
        DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(fixedBackOff); // 추가

        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.setCommonErrorHandler(defaultErrorHandler); // 추가
        return factory;
    }

 

메시지 1을 소비한 후 커밋에 실패했다. 따라서 CURRENT-OFFSET은 0이다.

 

이어서 메시지 2가 발행되었고, 소비자 서버가 메시지 2를 소비한 후 커밋에 성공했다. 이때 CURRENT-OFFSET은 2가 되었다.

 

이미 오프셋 커밋이 2가 되었으므로, 이때 스프링 서버를 재시작하더라도 메시지 1을 재소비하지 않는다. 따라서 메시지 1을 정상적으로 처리하지 못했는데 그 이후에 발행된 메시지 2를 커밋하며 메시지 1을 누락해버리는 결과가 발생한다.

DLT 사용

따라서 메시지 처리에 실패한 메시지가 누락되는 것을 막기 위해 Dead Letter Topic을 사용할 수 있다. 다음과 같이 DLT를 설정할 수 있다. 

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "default-group");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaTemplate<String, String> kafkaTemplate)  {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();

        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate); // DLT 설정
        FixedBackOff fixedBackOff = new FixedBackOff(0, 0);
        DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(recoverer, fixedBackOff);

        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.setCommonErrorHandler(defaultErrorHandler);
        return factory;
    }
}

 

커밋에 실패한 메시지는 DLT로 저장된다. 따라서 별도의 프로세스가 DLT을 처리하거나 개발자가 직접 DLT을 처리하는 등 처리되지 않은 메시지에 대해 처리할 수 있다. 참고로 DLT으로 저장될 경우, 해당 메시지는 기존 토픽에서 커밋되지는 않는다.

 

DLT를 설정한 후 메시지 1 커밋에 실패한 경우를 테스트해보면, 다음과 같이 DLT에 메시지가 저장된 것을 확인할 수 있다.

'Backend > Kafka' 카테고리의 다른 글

카프카란 무엇인가?  (1) 2025.01.21