이 글은 문제 해결을 위해 삽질을 적어놓은 글 입니다.
소켓 테스트를 짜는 분이 있다면 도움이 될 수도 있기 때문에 코드 위주로 첨부하도록 하겠습니다.
어떤 문제인지만 궁금하다면 해결 방법에 가서 보시면 됩니다.
문제 상황 및 세팅
간단한 시그널링 테스트를 진행 하고 있었는데 어이없는 상황에 부딪혔다.
코드를 보면 알겠지만, 웹소켓 통합 테스트를 위해서 다음과 같은 세팅을 해뒀다.
1. sender와 receiver를 분리해서 2개의 엔티티를 만들어준다.
2. workspace를 만들어서 각 유저를 해당 워크스페이스에 가입을 먼저 하고 진행한다.
거기까지가 이 부분이다.
성공한 테스트 상황
[세팅]
@BeforeEach
public void setup() {
MockitoAnnotations.openMocks(this);
this.WEBSOCKET_URI = "ws://localhost:" + port + "/ws-chat";
this.WEBSOCKET_TOPIC = "/app";
this.stompClient = new WebSocketStompClient(new StandardWebSocketClient());
this.stompClient.setMessageConverter(new MappingJackson2MessageConverter());
sender = User.builder()
.email("wnddms12345@gmail.com")
.profileImage("profileImage")
.provider("local")
.userName("userName")
.role(USER)
.build();
WorkSpace workSpace = new WorkSpace();
workSpace.setName("workSpaceName");
workSpaceRepository.save(workSpace);
sender.setWorkSpace(workSpace);
userRepository.save(sender);
receiver = User.builder()
.email("wnddms12345@gmail.com")
.profileImage("profileImage")
.provider("local")
.userName("userName")
.role(USER)
.build();
receiver.setWorkSpace(workSpace);
userRepository.save(receiver);
accessTokenExpiredDate = new Date(new Date().getTime() + 10000000L);
}
테스트를 하기 위해서 웹소켓에 연결을 진행하는 함수를 짜줬다. 일반 연결과 STOMP와 연결에 둘 다 JWT 인증은 진행하기 때문에 다음과 같은 모양으로 코드를 짜줬다.
[웹소켓 연결 함수]
private StompSession connectWebSocket(User user) throws
InterruptedException,
ExecutionException {
System.out.println("유저 ID: " + user.getId());
String token = tokenProvider.generateAccessToken(user, accessTokenExpiredDate);
System.out.println("임시 엑세스 토큰 생성: " + token);
//연결 시 사용하는 JWT 설정
WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
headers.add("Authorization", "Bearer " + token);
//STOMP 사용시 사용하는 JWT 설정
StompHeaders stompHeaders = new StompHeaders();
stompHeaders.add("Authorization", "Bearer " + token);
//STOMP 연결
StompSession session = stompClient.connectAsync(WEBSOCKET_URI, headers, stompHeaders,
new StompSessionHandlerAdapter() {
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
System.out.println("STOMP 연결 성공");
}
}).get();
return session;
}
다음으로는 아래와 같이 테스트 코드를 짜줬다.
[테스트]
@Test
@DisplayName("소켓으로 연결된 유저에게 ice 메시지 전송할 수 있다.")
public void testIce() throws InterruptedException, ExecutionException, TimeoutException {
BlockingQueue<IceDto> messageQueue = new LinkedBlockingQueue<>();
IceDto iceDto = new IceDto();
iceDto.setType(ScreenShare);
iceDto.setCandidate("candidate");
iceDto.setSdpMid("sdpMid");
iceDto.setSdpMLineIndex("sdpMLineIndex");
iceDto.setUserId(receiver.getId().toString());
//유저 2명 연결 1이 송신 2가 수신.
StompSession senderSession = connectWebSocket(sender);
StompSession receiverSession = connectWebSocket(receiver);
receiverSession.subscribe("/user/queue/ice/123", new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders headers) {
return IceDto.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
System.out.println("ice 응답값 : " + payload);
messageQueue.offer((IceDto)payload);
}
});
StompHeaders stompHeaders = new StompHeaders();
stompHeaders.setDestination(WEBSOCKET_TOPIC + "/ice/123");
senderSession.send(stompHeaders, iceDto);
IceDto receivedMessage = messageQueue.poll(5, TimeUnit.SECONDS);
assertThat(receivedMessage).isNotNull();
assertThat(receivedMessage.getUserId()).isEqualTo(sender.getId().toString());
}
컨트롤러 / 서비스 코드
이 테스트를 돌리면 아래의 컨트롤러와 서비스 코드가 검증이 된다.
[컨트롤러]
@Controller
@RequiredArgsConstructor
public class SignalingController {
private final SignalingService signalingService;
@MessageMapping("/ice/{roomId}")
public void ice(@DestinationVariable String roomId, IceDto iceDto, SimpMessageHeaderAccessor headerAccessor) {
System.out.println("ice Controller");
Long userId = (Long)headerAccessor.getSessionAttributes().get("userId");
System.out.println("userId : " + userId);
signalingService.sendIce(roomId, iceDto, userId);
}
...
}
[서비스 코드]
/**
* Ice 메시지를 상대방에게 전송합니다.(목적지는 메시지를 확인해서 소켓 ID를 찾아서 전송합니다.)
* @param roomId 참여중인 화상회의방 ID
* @param iceDto Ice 메시지 DTO
* @param senderId 보내는 사람 ID (내 Id)
*/
public void sendIce(String roomId, IceDto iceDto, Long senderId) {
String destId = iceDto.getUserId();
String socketId = socketRegistry.getSocketId(destId);
iceDto.setUserId(senderId.toString()); // 보내는 사람 ID로 갈아 껴줌.
iceDto.setType(iceDto.getType());
System.out.println("보낸 사람 : " + senderId);
System.out.println("보낸 사람 : " + socketRegistry.getSocketId(senderId.toString()));
System.out.println("받는 사람 : " + destId);
System.out.println("받는 사람 : " + socketRegistry.getSocketId(destId));
simpMessagingTemplate.convertAndSendToUser(socketId, "/queue/ice/" + roomId, iceDto);
}
문제가 생긴 테스트 코드
세팅과 연결 부분은 위에서 설명한 부분과 같다.
하지만 문제가 생긴부분은 offer 쪽 테스트였다.
@Test
@DisplayName("offer 테스트")
public void testOffer() throws InterruptedException, ExecutionException, TimeoutException {
BlockingQueue<SdpResponseDto> messageQueue = new LinkedBlockingQueue<>();
SdpMessageDto sdpMessageDto = new SdpMessageDto();
sdpMessageDto.setUserId(receiver.getId().toString());
sdpMessageDto.setType(ScreenShare);
SdpDto sdpDto = new SdpDto();
sdpDto.setSdp("sdp");
sdpDto.setType("offer");
sdpMessageDto.setSessionDescription(sdpDto);
//유저 2명 연결 1이 송신 2가 수신.
StompSession senderSession = connectWebSocket(sender);
StompSession receiverSession = connectWebSocket(receiver);
receiverSession.subscribe("/user/queue/offer/123", new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders headers) {
return SdpResponseDto.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
System.out.println("offer 응답값 : " + payload);
messageQueue.offer((SdpResponseDto)payload);
}
});
StompHeaders stompHeaders = new StompHeaders();
stompHeaders.setDestination(WEBSOCKET_TOPIC + "/offer/123");
senderSession.send(stompHeaders, sdpMessageDto);
SdpResponseDto receivedMessage = messageQueue.poll(5, TimeUnit.SECONDS);
assertThat(receivedMessage).isNotNull();
assertThat(receivedMessage.getUserInfo().getUserId()).isEqualTo(sender.getId().toString());
}
테스트의 로직은 위에서 보여준 로직과 다르지 않다.
이 코드는 아래 컨트롤러와 서비스를 검증하기 위해 작성했다.
@MessageMapping("/offer/{roomId}")
public void offer(@DestinationVariable String roomId, SdpMessageDto sdpMessageDto,
SimpMessageHeaderAccessor headerAccessor) {
Long userId = (Long)headerAccessor.getSessionAttributes().get("userId");
signalingService.sendOffer(roomId, sdpMessageDto, userId);
}
/**
* Offer 메시지를 상대방에게 전송합니다.(목적지는 메시지를 확인해서 소켓 ID를 찾아서 전송합니다.)
* /user/queue/offer/{roomId} 로 전송합니다.
* @param roomId 참여중인 화상회의방 ID
* @param sdpMessageDto Offer 메시지 DTO
* @param senderId 보내는 사람 ID (내 Id)
*/
public void sendOffer(String roomId, SdpMessageDto sdpMessageDto, Long senderId) {
String destId = sdpMessageDto.getUserId();
String socketId = socketRegistry.getSocketId(destId);
SdpResponseDto sdpResponseDto = SdpResponseDto.builder()
.userInfo(userService.getUserInfo(senderId))
.sessionDescription(sdpMessageDto.getSessionDescription())
.type(sdpMessageDto.getType())
.build();
System.out.println("보낸 사람 : " + senderId);
System.out.println("보낸 사람 : " + socketRegistry.getSocketId(senderId.toString()));
System.out.println("받는 사람 : " + destId);
System.out.println("받는 사람 : " + socketRegistry.getSocketId(destId));
System.out.println(sdpResponseDto);
simpMessagingTemplate.convertAndSendToUser(socketId, "/queue/offer/" + roomId, sdpResponseDto);
}
하지만 같은 로직이었음에도 큐에서 5초간 기다려도 아무런 응답이 없었다.
즉 요청을 소켓으로 응답이 오지 않는 상황이 발생했다.
해결 방안 탐색
이런 기묘한 상황을 해결하기 위해 발생할 수 있는 문제를 의심해봤다.
1. 이벤트 명을 잘못 적어서 오류가 발생한다.
2. socketRegistry 에 socketId 가 모종의 이유로 여러개의 소켓이 연결되거나, 덮어씌워져서 다른 socketId로 메시지를 보내고 있다.
3. 로직 코드에 문제가 있다.
4. 테스트 코드에 문제가 있다.
일단 1번은 눈이 빠지게 코드를 대조해봐도 아무런 문제가 없었기 때문에 패스한다.
2번 문제 같은 경우에는, 검증을 위해 StompPreHandler를 확인해서 진행했다.
SocketRegistry 는 많은 소켓을 관리하지는 않기 때문에 다음과 같은 간단한 구조를 가지고 있다.
1. 소켓 연결이 여러개 되는 문제 의심
[SocketRegistry]
@Component
@RequiredArgsConstructor
@Getter
public class SocketRegistry {
private final Map<String, String> userIdToSocketIdMap = new ConcurrentHashMap<>();
public void registerSession(String userId, String socketId) {
userIdToSocketIdMap.put(userId, socketId);
}
public String getSocketId(String userId) {
return userIdToSocketIdMap.get(userId);
}
public void unregisterSession(String userId) {
userIdToSocketIdMap.remove(userId);
}
}
소켓 세션을 등록하는 부분은 오직 한 부분으로 아래 코드에 있었다.
[StompPreHandler]
@Configuration
@Slf4j
public class StompPreHandler implements ChannelInterceptor {
private final TokenProvider tokenProvider;
private final ThreadPoolTaskScheduler taskScheduler;
private final SocketRegistry socketRegistry;
private final UserRepository userRepository;
StompPreHandler(TokenProvider tokenProvider, SocketRegistry socketRegistry,
UserRepository userRepository) {
this.tokenProvider = tokenProvider;
taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.initialize();
this.socketRegistry = socketRegistry;
this.userRepository = userRepository;
}
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
if (accessor != null && StompCommand.CONNECT.equals(accessor.getCommand())) {
System.out.println("preSend CONNECT가 실행");
String token = accessor.getFirstNativeHeader("Authorization");
System.out.println("token: " + token);
if (token == null || !token.startsWith("Bearer ")) {
throw new AuthException(AuthErrorCode.INVALID_ACCESS_TOKEN);
}
token = token.substring(7); // Remove "Bearer " prefix
if (!tokenProvider.validateToken(token)) {
throw new AuthException(AuthErrorCode.INVALID_ACCESS_TOKEN);
}
Long userId = tokenProvider.getUserId(token);
Long expiryTime = tokenProvider.getExpiryFromToken(token); //나중에 만료시간 설정해서 스캐줄러에 넣으려고
System.out.println("userId: " + userId);
System.out.println("socketId: " + accessor.getUser().getName());
socketRegistry.registerSession(userId.toString(), accessor.getUser().getName());
accessor.getSessionAttributes().put("userId", userId);
accessor.getSessionAttributes().put("expiryTime", expiryTime);
userRepository.findById(userId).ifPresent(user -> {
WorkSpace workSpace = user.getWorkSpace();
if (workSpace != null) {
accessor.getSessionAttributes().put("workSpaceId", workSpace.getId());
} else {
throw new AuthException(AuthErrorCode.INVALID_WORKSPACE_STATE_USER);
}
});
System.out.println("워크 스페이스 아이디: " + accessor.getSessionAttributes().get("workSpaceId"));
scheduleSessionExpiry(accessor, expiryTime);
}
return message;
}
private void scheduleSessionExpiry(StompHeaderAccessor accessor, Long expiryTime) {
Instant expireAt = Instant.ofEpochMilli(expiryTime); // 밀리초 단위의 타임스탬프를 Instant 객체로 변환
System.out.println("Session expiry scheduled for sessionId: " + accessor + " at: " + expireAt);
taskScheduler.schedule(() -> {
accessor.getSessionAttributes().clear();
System.out.println("Session expired and automatically closed for sessionId: 세션 만료!!");
}, expireAt);
}
}
여기서 userId와 socketId를 print해줌으로서, 유저가 두번 등록되거나, 덮어씌워지는 상황이 발생하는지 확인해보았다.
다음과 같이 userId 1과 2에 해당하는 socketId를 봤고 이것이 전송 직전의 socketId와 같은지 대조해보았다.
하지만 슬프게도 같은 socketId를 가지고 있었다.
2. 구독이 실패하는지 의심
receiverSession.subscribe("/user/queue/offer/123", new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders headers) {
System.out.println("구독 시작: /user/queue/offer/123");
return SdpResponseDto.class;
}
하지만 너무 멀쩡하게 잘 나왔고.
그래서 일단 이 코드는 실제로 잘 작동하는 문제가 없는 코드였기 때문에 테스트 쪽에 문제가 있다고 확신을 가지고 찾기 시작했다.
3. 문제발견 : 받아오는 데이터 확인해보기
@Test
@DisplayName("offer 테스트")
public void testOffer() throws InterruptedException, ExecutionException, TimeoutException {
BlockingQueue<Object> messageQueue = new LinkedBlockingQueue<>();
SdpMessageDto sdpMessageDto = new SdpMessageDto();
sdpMessageDto.setUserId(receiver.getId().toString());
sdpMessageDto.setType(ScreenShare);
SdpDto sdpDto = new SdpDto();
sdpDto.setSdp("sdp");
sdpDto.setType("offer");
sdpMessageDto.setSessionDescription(sdpDto);
//유저 2명 연결 1이 송신 2가 수신.
StompSession senderSession = connectWebSocket(sender);
StompSession receiverSession = connectWebSocket(receiver);
receiverSession.subscribe("/user/queue/offer/123", new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders headers) {
System.out.println("구독 시작: /user/queue/offer/123");
return Object.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
System.out.println("offer 응답값 : " + payload);
messageQueue.offer(payload);
}
});
StompHeaders stompHeaders = new StompHeaders();
stompHeaders.setDestination(WEBSOCKET_TOPIC + "/offer/123");
senderSession.send(stompHeaders, sdpMessageDto);
Object receivedMessage = messageQueue.poll(5, TimeUnit.SECONDS);
assertThat(receivedMessage).isNotNull();
// assertThat(receivedMessage.getUserInfo().getUserId()).isEqualTo(sender.getId().toString());
}
현재 BlockingQueue에 SdpResponseDto 타입으로 받고 있었는데 일단 응답이 왜 안오는지 확인하기 위해서 타입을 Object로 바꿔서 확인해봤다.
출력된 타입 [B는 Java에서 "바이트 배열(byte array)"을 의미하기 때문에 해당 값을 받아서 직렬화를 해주기로 생각했다.
@Override
public void handleFrame(StompHeaders headers, Object payload) {
System.out.println("offer 응답값 : " + payload);
System.out.println("offer 응답값 클래스 : " + payload.getClass().getName());
if (payload instanceof byte[]) {
byte[] byteArray = (byte[])payload;
try {
SdpResponseDto messageDto = objectMapper.readValue(byteArray, SdpResponseDto.class);
System.out.println("offer 응답값 변환된 객체 : " + messageDto);
messageQueue.offer(messageDto);
} catch (Exception e) {
System.out.println("예상치 못한 타입의 응답값: " + payload.getClass().getName());
e.printStackTrace();
}
} else {
System.out.println("예상치 못한 타입의 응답값: " + payload.getClass().getName());
messageQueue.offer(payload); // 여전히 대기열에 추가하여 테스트를 진행
}
}
하지만 여기서 오류가 또 발생했는데, 메시지를 읽어보면 기본 생성자가 없기 때문에 deserialize를 실패하는 문제였다.
문제 원인 파악 및 해결
@Data
@Schema(description = "offer 또는 answer를 받을 때 사용하는 DTO")
public class SdpResponseDto {
@Schema(description = "메시지를 보낸 사용자 정보(즉 자신이 아닌 통신하고 있는 유저 id)")
private UserInfoDto userInfo;
@Schema(description = "offer description 즉 createOffer 함수 이벤트 description 값을 그대로 여기 넣어서 보내면 됩니다.")
private SdpDto sessionDescription;
@Schema(description = "메시지 타입 \n"
+ "ScreenShare | Video", example = "ScreenShare")
private SignalMessageType type;
@Builder
public SdpResponseDto(UserInfoDto userInfo, SdpDto sessionDescription, SignalMessageType type) {
this.userInfo = userInfo;
this.sessionDescription = sessionDescription;
this.type = type;
}
}
문제는 @Builder를 사용했기 때문에 기본 생성자가 없고, Jackson이 기본 생성자가 없으면 객체를 생성할 수 없다는 점이었다. Jackson은 역직렬화할 때 기본 생성자를 필요로 한다. @Builder를 사용하면서도 Jackson이 사용할 수 있도록 하기 위해 기본 생성자를 추가하거나, Jackson이 빌더 패턴을 이해할 수 있도록 설정해야한다.
아주 간단하게 @NoArgsConstructor 를 붙히는 방법으로 해결했다.
@Data
@NoArgsConstructor
@Schema(description = "offer 또는 answer를 받을 때 사용하는 DTO")
public class SdpResponseDto {
@Schema(description = "메시지를 보낸 사용자 정보(즉 자신이 아닌 통신하고 있는 유저 id)")
private UserInfoDto userInfo;
@Schema(description = "offer description 즉 createOffer 함수 이벤트 description 값을 그대로 여기 넣어서 보내면 됩니다.")
private SdpDto sessionDescription;
@Schema(description = "메시지 타입 \n"
+ "ScreenShare | Video", example = "ScreenShare")
private SignalMessageType type;
@Builder
public SdpResponseDto(UserInfoDto userInfo, SdpDto sessionDescription, SignalMessageType type) {
this.userInfo = userInfo;
this.sessionDescription = sessionDescription;
this.type = type;
}
}
결국 테스트 코드도 로직도 문제가 아닌 직렬화 문제였다..
@Test
@DisplayName("offer 테스트")
public void testOffer() throws InterruptedException, ExecutionException, TimeoutException {
BlockingQueue<SdpResponseDto> messageQueue = new LinkedBlockingQueue<>();
SdpMessageDto sdpMessageDto = new SdpMessageDto();
sdpMessageDto.setUserId(receiver.getId().toString());
sdpMessageDto.setType(ScreenShare);
SdpDto sdpDto = new SdpDto();
sdpDto.setSdp("sdp");
sdpDto.setType("offer");
sdpMessageDto.setSessionDescription(sdpDto);
//유저 2명 연결 1이 송신 2가 수신.
StompSession senderSession = connectWebSocket(sender);
StompSession receiverSession = connectWebSocket(receiver);
receiverSession.subscribe("/user/queue/offer/123", new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders headers) {
System.out.println("구독 시작: /user/queue/offer/123");
return SdpResponseDto.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
System.out.println("offer 응답값 : " + payload);
messageQueue.offer((SdpResponseDto)payload);
}
});
StompHeaders stompHeaders = new StompHeaders();
stompHeaders.setDestination(WEBSOCKET_TOPIC + "/offer/123");
senderSession.send(stompHeaders, sdpMessageDto);
SdpResponseDto receivedMessage = messageQueue.poll(5, TimeUnit.SECONDS);
assertThat(receivedMessage).isNotNull();
assertThat(receivedMessage.getUserInfo().getUserId()).isEqualTo(sender.getId());
}
이후 다음과 같이 코드를 변경했고, 모든 테스트는 돌아갔다.
간단한 문제였지만, 안될때 당황하게 되는 법이다.
천천히 하나씩 찾아가면서 문제를 해결해보자.
'Backend' 카테고리의 다른 글
Aws SQS Spring으로 Listener Stop Start하기 (0) | 2024.08.13 |
---|---|
sqs spring boot 사용법 (0) | 2024.08.13 |
Spring Security 단위 테스트 (0) | 2024.08.07 |
[문제 해결] github Action + AWS CodeDeploy 배포 환경변수 삽질일기. (2) | 2024.08.07 |
서블릿이란 무엇인가 [서블릿 실행 흐름 편] (0) | 2024.05.08 |