본문 바로가기
Backend

Aws SQS Spring으로 Listener Stop Start하기

by 뜨거운 개발자 2024. 8. 13.

이 글을 읽고 기본 SQS세팅은 하고 진행하시길 바랍니다.

https://haward.tistory.com/246

 

sqs spring boot 사용법

한국어로 된 블로그 중에 공식문서대로 설정한 사람이 거의 없는 것 같고 버전이 많이 달라져서 최신화된 글을 위해 작성합니다.이 글은 2024-8월에 동작이 확인된 글 입니다. SQS설정build.gradle//sq

haward.tistory.com

 

문제 상황

프로젝트를 진행하는데 라이엇 API 갯수가 초과됐을 때 SQS에 있는 데이터를 처리하지 않고 잠시 이벤트를 끄는게 필요하다.

따라서 SQS에 이벤트를 받는 이벤트 리스너를 종료하는 방법을 알아보자.

[이전 코드]

이전의 SqsListenerConfig 코드를 보면 

@Configuration
public class SqsListenerConfig {
    @Bean
    public SqsMessageListenerContainerFactory<Object> sqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) {
       return SqsMessageListenerContainerFactory.builder()
          .sqsAsyncClient(sqsAsyncClient)
          .configure(options -> options
             .maxConcurrentMessages(10)  // 최대 동시 메시지 처리 수
             .pollTimeout(Duration.ofSeconds(10)))  // 폴링 타임아웃 설정
          .build();
    }
}

다음과 같이 팩토리를 만들었고 그걸 내가 만든 컴포넌트에 씌우는 방식으로 진행했다.

 

일단 이전 코드는 MessageListener를 컴포넌트로 만들고 SqsListener 어노테이션을 사용해서 처리했다.

[이전코드]

package mejai.mejaigg.messaging.sqs.listener;

import org.springframework.stereotype.Component;

import io.awspring.cloud.sqs.annotation.SqsListener;

@Component
public class MessageListener {
    @SqsListener(value = "mejai-renewal-sqs", factory = "sqsListenerContainerFactory")
    public void listen(String message) {
       System.out.println("Received message: " + message);
    }
}

 

하지만 이번에는 

package mejai.mejaigg.messaging.sqs.config;

import java.time.Duration;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory;
import io.awspring.cloud.sqs.listener.SqsMessageListenerContainer;
import mejai.mejaigg.messaging.sqs.listener.MyMessageListener;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

@Configuration
public class SqsListenerConfig {
    @Bean
    public SqsMessageListenerContainer<Object> sqsMessageListenerContainer(SqsAsyncClient sqsAsyncClient) {
       SqsMessageListenerContainerFactory<Object> factory = SqsMessageListenerContainerFactory.builder()
          .sqsAsyncClient(sqsAsyncClient)
          .configure(options -> options
             .maxConcurrentMessages(10)
             .pollTimeout(Duration.ofSeconds(10)))
          .build();
       // SqsMessageListenerContainer 생성
       SqsMessageListenerContainer<Object> container = factory.createContainer("mejai-renewal-sqs");

       // MessageListener 설정
       container.setMessageListener(new MyMessageListener());
       return container;
    }

}

다음과 같이 설정을 변경해준다.

그 이유는 SqsMessageListenerContainer 를 시작하고 종료하는 것을 직접 진행할 것이기 때문이다.

Factory를 사용하면 SqsMessageListenerContainer를 팩토리가 관리해서 직접적으로 중지를 시키기가 까다롭지만 이렇게 사용하면 중지를 더 쉽게 시킬 수 있다.

그렇다면 컨테이너의 메시지 리스너 같은 경우는 타입이 정해져 있기 때문에 다음과 같이 리스너 코드를 변경해준다.

package mejai.mejaigg.messaging.sqs.listener;

import org.springframework.messaging.Message;

import io.awspring.cloud.sqs.listener.MessageListener;

public class MyMessageListener implements MessageListener<Object> {

    @Override
    public void onMessage(Message<Object> message) {
       System.out.println("Received message: " + message.getPayload());
       // 메시지 처리 로직 추가
    }
}

SQS의 MessageListener를 상속 받아서 그걸 사용해줄 예정이다.

 

역시나 테스트를 위해 컨트롤러에 

@PostMapping("/start")
public String startListener() {
    listenerControlService.startListener();
    return "SQS Listener started.";
}

@PostMapping("/stop")
public String stopListener() {
    listenerControlService.stopListener();
    return "SQS Listener stopped.";
}

SQS를 켜고 끄는 걸 넣어줬다.

 

그리고 직접 켜고 끄는 것은 다음과 같이 해준다.

@Service
@RequiredArgsConstructor
@Slf4j
public class SqsListenerControlService {
    private final SqsMessageListenerContainer<Object> sqsMessageListenerContainer;

    public void stopListener() {
       sqsMessageListenerContainer.stop();
       log.info("SQS Listener has been stopped.");
    }

    public void startListener() {
       sqsMessageListenerContainer.start();
       log.info("SQS Listener has been started.");
    }

}

역시나 스웨거로 테스트 해보면 다음과 같이 Stop하고 성공하는 모습을 볼 수가 있다.

728x90