본문 바로가기
Backend

Aws SQS Spring 스캐줄러를 사용한 Listener 관리하기

by 뜨거운 개발자 2024. 10. 6.

이전에 SQS를 이용해서 리스너를 켜고 끄는 것 까지 진행해보았다.
SQS의 설정이 궁금하다면 이 게시물을 참고하고
https://haward.tistory.com/246

sqs spring boot 사용법

한국어로 된 블로그 중에 공식문서대로 설정한 사람이 거의 없는 것 같고 버전이 많이 달라져서 최신화된 글을 위해 작성합니다.이 글은 2024-8월에 동작이 확인된 글 입니다. SQS설정build.gradle//sq

haward.tistory.com

 
 
만약 리스너를 켜고 끄는 법을 알고 싶다면 이 게시물을 참고하자.
https://haward.tistory.com/247

Aws SQS Spring으로 Listener Stop Start하기

이 글을 읽고 기본 SQS세팅은 하고 진행하시길 바랍니다.https://haward.tistory.com/246 sqs spring boot 사용법한국어로 된 블로그 중에 공식문서대로 설정한 사람이 거의 없는 것 같고 버전이 많이 달라져

haward.tistory.com

 
 

할일

오늘의 요구사항은 다음과 같다.
메시지 수신 -> 서버에서 처리 -> 성공시 값을 갱신 -> Too many request로 실패한 경우 -> 리스너를 시간동안 꺼주고, 해당 시간 이후에 다시 API를 호출.

1. 명시적으로 메시지 삭제하기

SQS에서 받은 메시지를 수동으로 삭제하는 것이 아니라,  자동으로 메시지가 삭제되는 문제가 있었다. 그래서 필요한 것은 메시지가 자동으로 삭제되는 게 아니라 명시적으로 삭제하는 로직이 필요하다.

@Configuration
@RequiredArgsConstructor
@Slf4j
public class SqsListenerConfig {
    private final MyMessageListener myMessageListener;

    @Bean
    public SqsMessageListenerContainer<Object> sqsMessageListenerContainer(SqsAsyncClient sqsAsyncClient) {
       SqsMessageListenerContainerFactory<Object> factory = SqsMessageListenerContainerFactory.builder()
          .sqsAsyncClient(sqsAsyncClient)
          .configure(options -> options
             .acknowledgementMode(AcknowledgementMode.MANUAL) // 수동 확인 모드
             .maxConcurrentMessages(10) // 최대 동시 메시지 처리 수
             .pollTimeout(Duration.ofSeconds(10))) // 폴링 타임아웃
          .build();
       // SqsMessageListenerContainer 생성
       SqsMessageListenerContainer<Object> container = factory.createContainer("mejai-renewal-sqs");

       // MessageListener 설정
       container.setMessageListener((message) -> {
          try {
             myMessageListener.onMessage(message); // 메시지 처리
          } catch (Exception e) {
             log.warn("Failed to process message: " + e.getMessage());
             SqsListenerControlService.requestStop();
             log.info("SQS Listener has been stopped.");
          }
       });
       return container;
    }

이전에 만들었던 SqsListenerConfig 같은 경우 다음과 같이 변경해주었다.
가장 큰 변경사항은 acknowledgementMode(AcknowledgementMode.MANUAL)이다.
https://docs.awspring.io/spring-cloud-aws/docs/3.0.1/apidocs/io/awspring/cloud/sqs/listener/acknowledgement/handler/AcknowledgementMode.html#MANUAL

AcknowledgementMode (Spring Cloud AWS 3.0.1 API)

All Implemented Interfaces: Serializable, Comparable , Constable Configures the acknowledgement behavior for this container. Since: 3.0 Author: Tomaz Fernandes See Also: Nested Class Summary Enum Constant Summary Enum Constants Messages will be acknowledge

docs.awspring.io

 
아래 링크를 참고해보면 Mode의 옵션은 3가지가 있는데 각각의 옵션의 의미는 다음과 같다.

  • ALWAYS: 성공이든 실패든 메시지를 처리했으면 큐에서 삭제가 되는 방법.
  • MANUAL: 메시지를 자동으로 삭제하지 않는다. 리스너 메서드는 승인 매개 변수를 사용하여 각 메시지를 수동으로 승인해야한다.
  • ON_SUCCESS: 수신자 메서드가 성공적으로 실행되면 메시지를 삭제합니다. 수신자 메서드에서 예외가 발생하면 메시지가 삭제되지 않는다.

아무것도 설정해주지 않으면 기본 모드는 ON_SUCCESS 모드이다.
우리는 직접 삭제를 해주고 싶기 때문에 수동확인모드인 MANUAL 모드로 바꿔줬다.

2. 실패시 메시지 삭제하기

이제 메시지를 직접 삭제해야하는데 이 부분에서 많이 애를 먹었다.
사실 많은 문서에 Acknowledgement를 이용해서 메시지를 삭제해야한다고 한다.
실제로 GPT나 블로그 등 다양한 글에서 그렇게 말을했지만 공식문서도 그렇고 코드를 찾아봐도 어디에서 리스너를 사용한 방식에 Acknowledgement 객체를 사용하는 걸 보기는 어려웠다.
 
처음에는 메시지 헤더에 Acknowlegement 객체가 존재한다고 해서 다음과 같이 찾아주려고 했었다.

Acknowledgement acknowledgment = message.getHeaders()
	.get(Acknowledgement.class.getName(), Acknowledgement.class);
if (acknowledgment != null) {
	acknowledgment.acknowledge();
	log.info("Message acknowledged (deleted) successfully.");
}

그러나 항상 null을 반환해서, 찾을 수 없었다. 즉 위의 이 코드로는 동작하지 않았다.
그래서 나는 SqsClient를 이용해서 직접 삭제 요청을 보내는 방법으로 이 문제를 해결했다.
 
 
 

@RequiredArgsConstructor
@Component
@Slf4j
public class MyMessageListener implements MessageListener<Object> {
	private final ObjectMapper objectMapper;
	private final ProfileService profileService;
	private final StreakService streakService;
	private final SqsAsyncClient sqsAsyncClient;

	@Override
	public void onMessage(Message<Object> message) {
		try {
			String payload = (String)message.getPayload(); // 메시지 페이로드를 String으로 변환

			// 메시지 타입에 따라 처리
			if (payload.contains("year") && payload.contains("month")) {
				UserStreakRequest request = objectMapper.readValue(payload, UserStreakRequest.class);
				streakService.refreshStreak(request);
			} else {
				UserProfileRequest request = objectMapper.readValue(payload, UserProfileRequest.class);
				profileService.refreshUserProfileByNameTag(request.getId(),
					request.getTag());
			}

			// ACK 처리
			String receiptHandle = message.getHeaders().get("Sqs_ReceiptHandle", String.class);
			if (receiptHandle != null) {
				deleteMessage(receiptHandle);
				log.info("Message deleted successfully.");
			} else {
				log.warn("Receipt handle not found for the message.");
			}
        }
        catch(Exception e)
        {
        	//TODO: 예외처리 필요.
        }
...
	private void deleteMessage(String receiptHandle) {
		// 메시지 삭제 요청
		DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest.builder()
			.queueUrl("mejai-renewal-sqs") // SQS 큐 URL
			.receiptHandle(receiptHandle)
			.build();

		sqsAsyncClient.deleteMessage(deleteMessageRequest).whenComplete((result, exception) -> {
			if (exception != null) {
				log.error("Failed to delete message: {}", exception.getMessage());
			} else {
				log.info("Message successfully deleted from the queue.");
			}
		});
	}
}

코드를 확인해보면 SqsAcyncClient를 주입해서 사용하는 것을 볼 수 있다.
이 과정에서 Sqs Config에서 SqsListenerControlService를 이용해서 stop을 하려고 하는 상황 때문에 순환참조가 발생해, 전역 Context로 빼주었다.
 

3. 특정 에러에 대해서 스캐줄러 걸어주기

사실 다른 에러 같은 경우에는 오히려 무시하고 큐에서 pop을 해줘야만 하고, 꼭 스캐줄러를 꺼야하는 경우는 라이엇 서버가 too many request라고 했을 때 그 시간동안만 꺼야한다.
기본적으로 2개의 fegin 클라이언트를 보면 형식이 거의 유사하기 때문에 한개의 코드로 설명을 진행하겠다.

@FeignClient(
    name = "RiotKrClient",
    url = "https://kr.api.riotgames.com",
    fallbackFactory = RiotKrClient.RiotKrClientFallback.class
)
public interface RiotKrClient {

    @Retryable(
       value = {ServerErrorException.class}, // 재시도할 예외 지정
       maxAttempts = 3, // 최대 재시도 횟수
       backoff = @Backoff(delay = 2000) // 재시도 간격
    )
    @GetMapping("/lol/summoner/v4/summoners/by-puuid/{puuid}")
    SummonerDto getSummonerByPuuuid(@RequestParam("api_key") String riotKey, @PathVariable String puuid);

    @Retryable(
       value = {ServerErrorException.class}, // 재시도할 예외 지정
       maxAttempts = 3, // 최대 재시도 횟수
       backoff = @Backoff(delay = 2000) // 재시도 간격
    )
    @GetMapping("/lol/league/v4/entries/by-summoner/{summonerId}")
    Set<RankDto> getRankBySummonerId(@RequestParam("api_key") String riotKey, @PathVariable String summonerId);

    @Deprecated
    @GetMapping("/lol/summoner/v4/summoners/by-name/{name}")
    SummonerDto getSummerByName(@RequestParam("api_key") String riotKey, @PathVariable String name);

    @Slf4j
    @Component
    class RiotKrClientFallback implements FallbackFactory<RiotKrClient> {
       @Override
       public RiotKrClient create(Throwable cause) {
          if (cause instanceof FeignException.TooManyRequests) {
             // 429 오류 처리
             log.warn("429 Too Many Requests: {}", cause.getMessage());
             throw new ClientException(ClientErrorCode.TOO_MANY_REQUESTS);
          } else {
             // 그 외의 오류 처리
             log.error("FeignClient 오류: {}", cause.getMessage());
             throw new ClientException(ClientErrorCode.INTERNAL_SERVER_ERROR);
          }
       }
    }

}

다음과 같이 생겼기 때문에 429가 발생했을 때 RiotKrClientFallback에서 client에러로 만들어서 throw 해주기 때문에 이제 이걸 catch해서 처리하면 된다.
MyMessageListener에서 try 문 아래에 아까 구현하지 않은 catch문을 넣어준다.

catch (ClientException e) {
    // 특정 에러 코드가 TOO_MANY_REQUESTS인 경우에만 처리
    if (e.getClientErrorCode() == ClientErrorCode.TOO_MANY_REQUESTS) {
       // TOO_MANY_REQUESTS 에러에 대한 처리 로직
       log.info("Too many requests. Waiting for a while.");
       SqsListenerControlService.requestStop();

    } else { // 그 외의 경우에는 잘못된 형식이 온 것이므로 큐에서 제거.
       String receiptHandle = message.getHeaders().get("Sqs_ReceiptHandle", String.class);
       deleteMessage(receiptHandle);
    }
} catch (Exception e) {
    // 예외 처리
    log.warn("Failed to process message: " + e.getMessage());
    SqsListenerControlService.requestStop();
    log.info("SQS Listener has been stopped.");
    //TODO: 스캐줄러 걸어서 특정 시간 이후에 켜주기
}

 
현재 다음과 같이 ClientException이 발생했을 때, 에러의 종류에 따라 Too Many Request가 났을 때 SqsListener를 꺼준다.
SqsLisener 코드는 이전 게시물과는 조금 다르게 전역 Context로 빼주었다.

@Service
@RequiredArgsConstructor
@Slf4j
public class SqsListenerControlService implements ApplicationContextAware {
    private static ApplicationContext context;

    @Override
    public void setApplicationContext(ApplicationContext ctx) {
       context = ctx;
    }

    public static void requestStop() {
       SqsMessageListenerContainer<?> container = context.getBean(SqsMessageListenerContainer.class);
       container.stop();
       log.info("SQS Listener has been stopped due to an error during message processing.");
    }

    public static void requestStart() {
       SqsMessageListenerContainer<?> container = context.getBean(SqsMessageListenerContainer.class);
       container.start();
       log.info("SQS Listener has been started.");
    }
}

그리고 이후에 알아서 스캐줄러를 걸어줄 생각이었지만 굳이 그렇게 구현할 필요가 없었다.
그 이유는 스캐줄러를 걸어서 SQSListener를 켜줘도 켜지지 않는 경우가 발생했다.(리스너를 끄는데 실패하는 경우 등)
따라서 무슨 이유로 SQSListener가 꺼진다면, 그걸 지속적으로 켜줘야만 했다.
그래서 Cron을 걸어서 매 1분마다 Listener를 켜줬다.
왜냐하면 리스너가 켜져있는 상태에서 다시 켜는 요청은 무시되기 때문이다.

@Configuration
@EnableScheduling
@Component
public class CronConfig {
    @Scheduled(cron = "0 * * * * *")  // 1분마다 실행
    public void cron() {
       SqsListenerControlService.requestStart();
    }
}

그래서 이렇게 간단하게 cron으로 리스너를 켜줬다.

최종 MyMessageListener

@RequiredArgsConstructor
@Component
@Slf4j
public class MyMessageListener implements MessageListener<Object> {
    private final ObjectMapper objectMapper;
    private final ProfileService profileService;
    private final StreakService streakService;
    private final SqsAsyncClient sqsAsyncClient;

    @Override
    public void onMessage(Message<Object> message) {
       try {
          String payload = (String)message.getPayload(); // 메시지 페이로드를 String으로 변환

          // 메시지 타입에 따라 처리
          if (payload.contains("year") && payload.contains("month")) {
             UserStreakRequest request = objectMapper.readValue(payload, UserStreakRequest.class);
             streakService.refreshStreak(request);
          } else {
             UserProfileRequest request = objectMapper.readValue(payload, UserProfileRequest.class);
             profileService.refreshUserProfileByNameTag(request.getId(),
                request.getTag());
          }

          // ACK 처리
          String receiptHandle = message.getHeaders().get("Sqs_ReceiptHandle", String.class);
          if (receiptHandle != null) {
             deleteMessage(receiptHandle);
             log.info("Message deleted successfully.");
          } else {
             log.warn("Receipt handle not found for the message.");
          }

       } catch (ClientException e) {
          // 특정 에러 코드가 TOO_MANY_REQUESTS인 경우에만 처리
          if (e.getClientErrorCode() == ClientErrorCode.TOO_MANY_REQUESTS) {
             // TOO_MANY_REQUESTS 에러에 대한 처리 로직
             log.info("Too many requests. Waiting for a while.");
             SqsListenerControlService.requestStop();

          } else { // 그 외의 경우에는 잘못된 형식이 온 것이므로 큐에서 제거.
             String receiptHandle = message.getHeaders().get("Sqs_ReceiptHandle", String.class);
             deleteMessage(receiptHandle);
          }
       } catch (Exception e) {
          // 예외 처리
          log.warn("Failed to process message: " + e.getMessage());
          SqsListenerControlService.requestStop();
          log.info("SQS Listener has been stopped.");
          //TODO: 스캐줄러 걸어서 특정 시간 이후에 켜주기
       }
    }

    private void deleteMessage(String receiptHandle) {
       // 메시지 삭제 요청
       DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest.builder()
          .queueUrl("mejai-renewal-sqs") // SQS 큐 URL
          .receiptHandle(receiptHandle)
          .build();

       sqsAsyncClient.deleteMessage(deleteMessageRequest).whenComplete((result, exception) -> {
          if (exception != null) {
             log.error("Failed to delete message: {}", exception.getMessage());
          } else {
             log.info("Message successfully deleted from the queue.");
          }
       });
    }
}

최종 코드는 다음과 같다.
전체 코드는 이곳에서 확인 가능하다.
https://github.com/mejaiKR/Backend

GitHub - mejaiKR/Backend

Contribute to mejaiKR/Backend development by creating an account on GitHub.

github.com

 
최종적으로 우리가 제작한 아키텍쳐의 구성을 완성하였다.

728x90