저는 개인적으로 채팅 서비스를 토이 프로젝트로 개발하고 있습니다.
웹소켓, Kafka를 이용해서 메시지 저장 기능을 개발했었는데요.
이미지처럼 메시지를 한번 송신했는데 10번이 저장되는 이슈가 발생했습니다.
어떻게 된 일일까요?
콘솔 메시지를 확인해보니 KafkaException과 그 밑에는 코드 실수로 인한 NPE 예외가 콘솔에 찍혀있었습니다.
메시지를 저장하고, 그 이후의 데이터를 처리하는 과정에서 NPE가 발생했고,
consume 메서드 내에서 예외가 발생해서 리트라이가 발생한 이슈였습니다.
그렇다면 10번 메시지가 저장됐던 이유는 무엇일까요?
위 코드에서 DEFAULT_BACK_OFF의 maxAttempt가 9로 설정돼있으므로
9번의 재시도 이후 열번째의 시도에서 메시지를 저장하고, 실패하면서 consume 과정이 마무리됩니다.
물론 지금의 프로젝트에서 재시도 횟수가 중요한 이슈는 아니었지만
재시도를 비롯한 예외 처리에 대한 로직을 설정하는 방법을 간단하게 확인했습니다.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ChatMessageDTO> messageListner() {
ConcurrentKafkaListenerContainerFactory<String, ChatMessageDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setCommonErrorHandler(errorHandler());
return factory;
}
@Bean
public DefaultErrorHandler errorHandler() {
BackOff fixedBackOff = new FixedBackOff(5, 5);
DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, exception) -> {
// logic to execute when all the retry attemps are exhausted
}, fixedBackOff);
return errorHandler;
}
위 코드처럼 ErrorHandler에 BackOff를 등록하고,
ErrorHandler를 KafkaListenerContainerFactory에 등록해서 사용하면 됩니다.
참고 문헌
https://docs.spring.io/spring-kafka/reference/kafka/annotation-error-handling.html
'Troubleshooting' 카테고리의 다른 글
+ ""는 의미가 있을까요? (4) | 2024.02.19 |
---|