이 글을 읽고 기본 SQS세팅은 하고 진행하시길 바랍니다.
https://haward.tistory.com/246
문제 상황
프로젝트를 진행하는데 라이엇 API 갯수가 초과됐을 때 SQS에 있는 데이터를 처리하지 않고 잠시 이벤트를 끄는게 필요하다.
따라서 SQS에 이벤트를 받는 이벤트 리스너를 종료하는 방법을 알아보자.
[이전 코드]
이전의 SqsListenerConfig 코드를 보면
@Configuration
public class SqsListenerConfig {
@Bean
public SqsMessageListenerContainerFactory<Object> sqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) {
return SqsMessageListenerContainerFactory.builder()
.sqsAsyncClient(sqsAsyncClient)
.configure(options -> options
.maxConcurrentMessages(10) // 최대 동시 메시지 처리 수
.pollTimeout(Duration.ofSeconds(10))) // 폴링 타임아웃 설정
.build();
}
}
다음과 같이 팩토리를 만들었고 그걸 내가 만든 컴포넌트에 씌우는 방식으로 진행했다.
일단 이전 코드는 MessageListener를 컴포넌트로 만들고 SqsListener 어노테이션을 사용해서 처리했다.
[이전코드]
package mejai.mejaigg.messaging.sqs.listener;
import org.springframework.stereotype.Component;
import io.awspring.cloud.sqs.annotation.SqsListener;
@Component
public class MessageListener {
@SqsListener(value = "mejai-renewal-sqs", factory = "sqsListenerContainerFactory")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}
하지만 이번에는
package mejai.mejaigg.messaging.sqs.config;
import java.time.Duration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory;
import io.awspring.cloud.sqs.listener.SqsMessageListenerContainer;
import mejai.mejaigg.messaging.sqs.listener.MyMessageListener;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
@Configuration
public class SqsListenerConfig {
@Bean
public SqsMessageListenerContainer<Object> sqsMessageListenerContainer(SqsAsyncClient sqsAsyncClient) {
SqsMessageListenerContainerFactory<Object> factory = SqsMessageListenerContainerFactory.builder()
.sqsAsyncClient(sqsAsyncClient)
.configure(options -> options
.maxConcurrentMessages(10)
.pollTimeout(Duration.ofSeconds(10)))
.build();
// SqsMessageListenerContainer 생성
SqsMessageListenerContainer<Object> container = factory.createContainer("mejai-renewal-sqs");
// MessageListener 설정
container.setMessageListener(new MyMessageListener());
return container;
}
}
다음과 같이 설정을 변경해준다.
그 이유는 SqsMessageListenerContainer 를 시작하고 종료하는 것을 직접 진행할 것이기 때문이다.
Factory를 사용하면 SqsMessageListenerContainer를 팩토리가 관리해서 직접적으로 중지를 시키기가 까다롭지만 이렇게 사용하면 중지를 더 쉽게 시킬 수 있다.
그렇다면 컨테이너의 메시지 리스너 같은 경우는 타입이 정해져 있기 때문에 다음과 같이 리스너 코드를 변경해준다.
package mejai.mejaigg.messaging.sqs.listener;
import org.springframework.messaging.Message;
import io.awspring.cloud.sqs.listener.MessageListener;
public class MyMessageListener implements MessageListener<Object> {
@Override
public void onMessage(Message<Object> message) {
System.out.println("Received message: " + message.getPayload());
// 메시지 처리 로직 추가
}
}
SQS의 MessageListener를 상속 받아서 그걸 사용해줄 예정이다.
역시나 테스트를 위해 컨트롤러에
@PostMapping("/start")
public String startListener() {
listenerControlService.startListener();
return "SQS Listener started.";
}
@PostMapping("/stop")
public String stopListener() {
listenerControlService.stopListener();
return "SQS Listener stopped.";
}
SQS를 켜고 끄는 걸 넣어줬다.
그리고 직접 켜고 끄는 것은 다음과 같이 해준다.
@Service
@RequiredArgsConstructor
@Slf4j
public class SqsListenerControlService {
private final SqsMessageListenerContainer<Object> sqsMessageListenerContainer;
public void stopListener() {
sqsMessageListenerContainer.stop();
log.info("SQS Listener has been stopped.");
}
public void startListener() {
sqsMessageListenerContainer.start();
log.info("SQS Listener has been started.");
}
}
역시나 스웨거로 테스트 해보면 다음과 같이 Stop하고 성공하는 모습을 볼 수가 있다.
728x90
'Backend' 카테고리의 다른 글
[k8s] kubectl 명령어가 동작하지 않을 때 (왜 쿠버네티스는 스왑 메모리 사용을 허용하지 않는가?) (1) | 2024.09.17 |
---|---|
Spring Boot에서 Swagger를 통한 API 문서화 설정 가이드 (2) | 2024.09.13 |
sqs spring boot 사용법 (0) | 2024.08.13 |
[문제해결] Spring webSocket Test 삽질 일기 (생성자 직렬화 문제) (0) | 2024.08.07 |
Spring Security 단위 테스트 (0) | 2024.08.07 |