본문 바로가기
Web/BackEnd

[Backend] RabbitMQ 에러 핸들링 (feat. Retry)

by 희조당 2024. 7. 14.
728x90

무한 예외 발생

MQ를 활용한 기본 발급 로직을 짠 뒤, 실패 테스트를 하던 중에 리스너에서 계속 예외를 던지는 상황을 마주하게 된다.

Execution of Rabbit message listener failed.

 

기본적으로 RabbitMQ에서는 다른 조치가 없으면 리스너에서 예외가 발생하면 다시 큐에 집어넣게 된다.

그리곤 무한히 반복하게 되기 때문에 이런 상황을 마주한 것이다.

무한 Retry 해결하기

이 상황을 해결하는 가장 쉬운 방법은 다음과 같다.

1. Requeue 옵션 설정하기

다음과 같이 yml 설정하면 가장 간단하게 Requeue 옵션을 제어할 수 있다.

# application.yml
spring:
  rabbitmq:
    listener:
      simple: # exchange 옵션에 따라 다르다!
        default-requeue-rejected: false

 

나는 이상하게도 yml으로 해결되지 않아 listener에 설정을 추가해줬다.

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory cachingConnectionFactory) {
    var containerFactory = new SimpleRabbitListenerContainerFactory();
    containerFactory.setConnectionFactory(cachingConnectionFactory);
    containerFactory.setDefaultRequeueRejected(false);
    containerFactory.setMessageConverter(jackson2JsonMessageConverter());
    return containerFactory;
}

💡 ListenerContainerFactory

리스너 설정을 대표적으로 관리하는 친구이다.
ConnectionFactory를 통해서 RabbitMQ와의 연결을 설정한다.
재시도 정책, 동시성 설정 등을 여기서 제어할 수 있다.

2. AmqpRejectAndDontRequeueException

이 예외를 그냥 던져버리면 된다. Default 설정과 관계없이 실패한 메시지를 큐에 넣지 않는다.

3. RetryOperationsInterceptor

예외는 다양한 이유로 발생할 수 있다. 따라서, 무조건적으로 재시도를 막는 건 좋지 않다.

여기서도 두 가지 방법으로 재시도를 할 수 있는데 가장 쉬운 방법은 Spring-retry를 사용하는 것이다.

 

@Retryable을 걸어줌으로 재시도를 쉽게 적용시킬 수 있다.

@Retryable
@RabbitListener(queues = QUEUE)
public void onListen(...) {
  // do something...
}

 

이 방법은 리스너마다 재시도 전략을 세울 수 있다는 장점이 있다.

반면에 불필요한 의존성이 추가된다는 단점이 존재한다. 경우에 따라서 AOP 의존성도 같이 넣어줘야하기 때문이다.

 

전역적으로 동일한 설정을 해주고 싶다면 ListenerContainerFactory에 설정을 추가해주는 방법도 존재한다.

@Bean
public RetryOperationsInterceptor retryOperationsInterceptor() {
  return RetryInterceptorBuilder.stateless()
    .retryPolicy(new SimpleRetryPolicy(3))
    .build();
}

@Bean
public SimpleRabbitListenerContainerFactory listenerContainerFactory(...) {
  var containerFactory = new SimpleRabbitListenerContainerFactory();
  containerFactory.setAdviceChain(retryOperationsInterceptor());
  ...
}

 

위와 같이 작성하거나 다음과 같이 적용시킬 수 있다.

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory cachingConnectionFactory) {
    var containerFactory = new SimpleRabbitListenerContainerFactory();
    containerFactory.setConnectionFactory(cachingConnectionFactory);

    containerFactory.setAdviceChain(
      RetryInterceptorBuilder.stateless()
        .maxAttempts(3)
        .backOffOptions(Duration.ofSeconds(3L).toMillis(), 2, Duration.ofSeconds(10L).toMillis())
        .recoverer(new RejectAndDontRequeueRecoverer())
        .build()
    );

    return containerFactory;
  }

Dead Letter Queue

실패한 메시지를 단순하게 재시도 후 버리면 추후 예외에 대한 추적이 어려울 수 있다.

따라서, 실패한 메시지를 모아두는 공간이 필요한데 그걸 DLQ라고 부른다.

 

기존 큐에서 실패했을 때 어디로 넣어줄지 알려준 뒤, 실패한 메시지를 받을 큐와 다른 설정들을 해주면 된다.

@Bean
public Queue queue() {
  return QueueBuilder.durable("my-queue")
	// .withArgument("x-dead-letter-exchange", "my-queue.dlq")
    .deadLetterExchange("my-queue.dlq") // 두 라인 모두 같은 의미이다.
    .build();
}

 

여기서 DLX(Dead Letter Exchange)는 Fanout으로 설정했는데 필요에 맞게 설정해주면 된다.

@Bean
public Queue deadLetterQueue() {
  return QueueBuilder.durable("my-queue.dlq").build();
}

@Bean
public FanoutExchange deadLetterExchange() {
  return ExchangeBuilder.fanoutExchange("my-exchange.dlx").build();
}

@Bean
public Binding deadLetterBinding() {
  return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
}

Parking Lot

주차장을 의미하는 이 큐에서는 좀 더 복잡한 상황을 위해 존재하는 큐이다.

예를 들면, 단순히 버릴 수 없어 직접 처리해야 하거나 특정 재시도 횟수를 넘은 메시지를 보관한다.

 

앞서 언급한 목적이 아니더라도 내부 컨벤션에 따라 기능을 정의하면 된다.

너무 오래 보관되거나 큐가 감당할 수 없는 크기까지 보관되지 않도록 유의해야 한다.


Error Handler

기존에 사용한 SimpleRabbitListenerContainerFactory는 기본적으로 ConditionalRejectingErrorHandler를 사용한다.

이 핸들러는 스프링의 기본 핸들러인 ErrorHandler를 상속받았고, 치명적인지 확인한 뒤 예외를 처리한다.

 

이 핸들러가 치명적이라고 판단하는 예외의 종류는 다음과 같다. (isCauseFatal()을 살펴보면 된다.)

  • MessageConversionException
  • MessageConversionException
  • MethodArgumentNotValidException
  • MethodArgumentTypeMismatchException
  • NoSuchMethodException
  • ClassCastException

FatalExceptionStrategy

핸들러가 치명적인 예외인지 FatalExceptionStrategyisFatal()을 호출함으로 확인한다.

@FunctionalInterface
public interface FatalExceptionStrategy {
  boolean isFatal(Throwable t);
}

ConditionalRejectingErrorHandler는 내부적으로 기본 전략을 구현해 두고 사용하고 있다.

앞서 언급한 예외 종류라면 핸들링하고, 아니라면 AmqpRejectAndDontRequeueException를 던지며 메시지를 거절한다.

 

따로 사용하는 예외가 있다면 다음과 같이 데코레이터 패턴을 사용해서 확장시킬 수 있다.

// ErrorHandler 자체를 상속한 뒤에 데코레이팅을 할 수도 있다.
public static class MyFatalExceptionStrategy implements FatalExceptionStrategy {
  private final FatalExceptionStrategy defaultExceptionStrategy = new ConditionalRejectingErrorHandler.DefaultExceptionStrategy();

  @Override
  public boolean isFatal(Throwable t) {
    return defaultExceptionStrategy.isFatal(t) || t.getCause() instanceof MyException; // 여기
  }
}

 

확장시킨 뒤 우리가 사용하는 리스너에 전역으로 설정해주면 된다.

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory cachingConnectionFactory) {
  var containerFactory = new SimpleRabbitListenerContainerFactory();
  containerFactory.setConnectionFactory(cachingConnectionFactory);
  containerFactory.setErrorHandler(new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy())); // 여기
  return containerFactory;
}

With Bean

앞서 언급한 방법은 전역으로 사용한다는 문제가 있다.

상황에 따라서 다르게 설정해 줄 수도 있는데 가장 먼저 언급할 내용은 Bean으로 사용하는 것이다.

 

다음과 같이 간단하게 빈으로 등록한 뒤, 적용시킬 리스너에 errorHandler로 지정해주면 된다.

@Bean
public RabbitListenerErrorHandler customErrorHandler() {
  return (message, message1, e) -> {
    log.info(e.getCause().toString());
    return null;
  };
}
@RabbitListener(queues = MY_QUEUE, errorHandler = "customErrorHandler")
public void receiveMessage(Message message) {
    // do something...
}

Implements Error Handler

아니면 전역적으로 정말 간단하게 등록하고 싶을 수가 있는데 그럴 때는 ErrorHandler를 구현해주면 된다.

Bean으로 구현하거나 람다로 구현한 뒤에 리스너 설정에 적용시키면 끝이다.

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory cachingConnectionFactory) {
  var containerFactory = new SimpleRabbitListenerContainerFactory();
  containerFactory.setConnectionFactory(cachingConnectionFactory);
    containerFactory.setErrorHandler(t -> log.info(t.getCause().toString())); // 여기
  return containerFactory;
}

 


-Reference:

https://docs.spring.io/spring-amqp/reference

 

😋 지극히 개인적인 블로그지만 댓글과 조언은 제 성장에 도움이 됩니다 😋

'Web > BackEnd' 카테고리의 다른 글

[Backend] flyway 사용하기  (0) 2023.10.11
[Backend] DB 인덱스 이해하기  (0) 2023.08.04
[Backend] 트랜잭션, 격리 수준  (0) 2023.07.27
[BackEnd] MapStruct 사용기  (3) 2023.05.13
[BackEnd] Enum 유효성 검사 구현기  (1) 2023.04.25

댓글