elevne's Study Note

Spring WebSocket (STOMP) 본문

Backend/Spring

Spring WebSocket (STOMP)

elevne 2023. 6. 30. 11:21

pub/sub 구조에 대해 알아본다. pub/sub 는 아래 이미지와 같은 구조를 가진다.

 

 

pub/sub

 

 

Publisher, Subsriber 로 나뉘어지며 Subscriber 들은 특정 Topic(Channel) 을 구독한다. Publisher 가 해당 Topic 으로 메시지를 발행하면 해당 Topic 을 구독하는 사용자들은 그 메시지를 받게되는 것이다. 즉, 메시지의 발행자와 이용자 사이에 1:N 관계가 형성되는 브로드캐스트 스타일의 배포 방법이라고 볼 수 있다. (e.g., 항공사에서 항공편의 착륙 시간 또는 지연 상태에 관한 업데이트를 배포할 경우, 여러 관계자가 이 정보를 활용할 수 있다. 지상 팀은 항공기 정비 및 급유를 진행하고 수화물 팀, 승무원, 조종사는 비행기의 다음 일정을 준비하며 비주얼 디스플레이 운영 팀은 공항 이용객에게 이 정보를 공지한다. pub/sub 메시징 스타일은 이러한 시나리오에 적절하다)

 

 

 

STOMP 는 Ruby, Python, Perl 과 같은 스크립트 언어를 위해 고안된 단순한 메시징 프로토콜이다. 이는 메시징 프로토콜에서 일반적으로 사용되는 패턴들의 일부를 제공한다. STOMP 는 TCP 나 WebSocket 과 같은 신뢰성있는 양방향 Streaming Network Protocol 상에서 사용될 수 있다. STOMP 는 HTTP 에 모델링된 frame 기반 프로토콜이라고 한다. 아래는 frame 의 구조다.

 

 

COMMAND
header1:value1
header2:value2
 
Body^@

 

 

클라이언트는 메시지를 보내기 위해 SEND 명령을 수행하거나, 구독을 위해 SUBSCRIBE 명령을 수행할 수 있다. 이러한 명령어들은 destination 헤더를 요구하는데, 이는 메시지를 어디에 전송할지 혹은 어디에서 메시지를 받을지를 나타낸다.

 

 

SEND
destination:/queue/trade
content-type:application/json
content-length:44
 
{"action":"BUY","ticker":"MMM","shares",44}^@

 

 

STOMP 는 표준 메시지 포맷이며 서버, 클라이언트 양측에서 메시지를 해석, 처리할 수 있다. 또한, 외부의 Message Broker (RabbitMQ, ActiveMQ 등) 를 같이 사용할 수 있다는 장점이 있다.

 

 

 

여기서 Message Broker 이란 애플리케이션, 시스템 및 서비스가 서로 간에 통신하고 정보를 교환할 수 있도록 해주는 소프트웨어다. 메시지브로커는 메시지를 검증, 저장, 라우팅하고 이를 적절한 대상에 전달할 수 있다. 다른 애플리케이션 간의 중개자 역할을 함으로써 수신자의 위치, 수신자의 활성 여부 또는 수신자의 수를 잘 몰라도 송신자가 메시지를 발행할 수 있게 해준다. 확실한 메시지 저장 및 전달을 보장해야 하는 메시지브로커는 이용하는 애플리케이션에서 메시지를 처리할 수 있을 때까지 메시지를 저장하고 순서화하기 위해 메시지큐를 사용한다. 메시지 큐에서 메시지는 전송된 순서대로 저장되며, 수신이 확인될 때까지 큐에 남아있는다.

 

 

 

STOMP 기반 통신을 Spring 에서 구현해보았다. 우선 이를 위해 새로운 Config 파일을 아래와 같이 작성한다.

 

 

@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/app");
        registry.enableSimpleBroker("/topic", "/queue");
        registry.setUserDestinationPrefix("/user");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/chat");
        registry.addEndpoint("/chat", "/chat/private").setAllowedOriginPatterns("*").withSockJS();
    }

}

 

 

@EnableWebSocket 에서 @EnableWebSocketMessageBroker 애노테이션으로 바뀌었다. 이 또한 @Configuration 클래스에 붙여주는 애노테이션으로 이는 @EnableWebSocket 과는 다르게 Broker 을 사용할 수 있게끔 해준다. 또, 위 클래스는

WebSocketMessageBrokerConfigurer 인터페이스를 구현하고 있다.

 

 

public interface WebSocketMessageBrokerConfigurer {

	/**
	 * Register STOMP endpoints mapping each to a specific URL and (optionally)
	 * enabling and configuring SockJS fallback options.
	 */
	default void registerStompEndpoints(StompEndpointRegistry registry) {
	}

	/**
	 * Configure options related to the processing of messages received from and
	 * sent to WebSocket clients.
	 */
	default void configureWebSocketTransport(WebSocketTransportRegistration registry) {
	}

	/**
	 * Configure the {@link org.springframework.messaging.MessageChannel} used for
	 * incoming messages from WebSocket clients. By default, the channel is backed
	 * by a thread pool of size 1. It is recommended to customize thread pool
	 * settings for production use.
	 */
	default void configureClientInboundChannel(ChannelRegistration registration) {
	}

	/**
	 * Configure the {@link org.springframework.messaging.MessageChannel} used for
	 * outbound messages to WebSocket clients. By default, the channel is backed
	 * by a thread pool of size 1. It is recommended to customize thread pool
	 * settings for production use.
	 */
	default void configureClientOutboundChannel(ChannelRegistration registration) {
	}

	/**
	 * Add resolvers to support custom controller method argument types.
	 * <p>This does not override the built-in support for resolving handler
	 * method arguments. To customize the built-in support for argument
	 * resolution, configure {@code SimpAnnotationMethodMessageHandler} directly.
	 * @param argumentResolvers the resolvers to register (initially an empty list)
	 * @since 4.1.1
	 */
	default void addArgumentResolvers(List<HandlerMethodArgumentResolver> argumentResolvers) {
	}

	/**
	 * Add handlers to support custom controller method return value types.
	 * <p>Using this option does not override the built-in support for handling
	 * return values. To customize the built-in support for handling return
	 * values, configure  {@code SimpAnnotationMethodMessageHandler} directly.
	 * @param returnValueHandlers the handlers to register (initially an empty list)
	 * @since 4.1.1
	 */
	default void addReturnValueHandlers(List<HandlerMethodReturnValueHandler> returnValueHandlers) {
	}

	/**
	 * Configure the message converters to use when extracting the payload of
	 * messages in annotated methods and when sending messages (e.g. through the
	 * "broker" SimpMessagingTemplate).
	 * <p>The provided list, initially empty, can be used to add message converters
	 * while the boolean return value is used to determine if default message should
	 * be added as well.
	 * @param messageConverters the converters to configure (initially an empty list)
	 * @return whether to also add default converter or not
	 */
	default boolean configureMessageConverters(List<MessageConverter> messageConverters) {
		return true;
	}

	/**
	 * Configure message broker options.
	 */
	default void configureMessageBroker(MessageBrokerRegistry registry) {
	}

}

 

 

 

위 작성된 @Configuration 코드에서는 configureMessageBroker 메소드가 먼저 사용되었다. enableSimpleBroker 로 Message Broker 을 등록한다. 보통 "topic", "queue" 를 사용한다고 한다. "topic" 은 한 명이 메시지를 발행했을 때 해당 토픽을 구독하고 있는 n 명에게 메시지를 뿌려야하는 경우에, "queue" 는 한 명에게 정보를 보내는 경우에 사용한다. setApplicationDestinationPrefixes 는 도착 경로에 대한 prefix 를 설정한다. 위와 같이 "app" 으로 해두면 "/app/topic/..." 형태로 사용되는 것이다. setUserDestinationPrefix 는 User Destination 에 대한 옵션을 지정한다. Destination 은 각 유저별로 그들의 세션에 대한 유니크한 이름의 queue 를 구독, 또 그에 Publish 하게된다. 그 다음으로는 registerStompEndpoints 메소드를 사용하였다. 이전에 addHandler 을 작성했던 것과 유사하다. 연결한 소켓 엔드포인트를 지정, 여기에도 마찬가지로 addInterceptors 를 통해 HandshakeInterceptor 을 등록할 수 있다.

 

 

 

그 다음으로는 아래와 같이 Controller 을 작성한다.

 

 

@Controller
public class ChatController {

    @MessageMapping("/topic/{groupName}")
    @SendTo("/topic/message/{groupName}")
    public Message receivePublicMessage(@Payload Message message, @DestinationVariable("groupName") String groupName){
        return message;
    }

    @MessageMapping("/queue/{userName}")
    @SendTo("/queue/message/{userName}")
    public Message receivePrivateMessage(@Payload Message message, @DestinationVariable("userName") String userName) {
        return message;
    }

}

 

 

@MessageMapping 애노테이션은 Publish 하는 경로, @SendTo(@SendToUser 도 가능) 에는 Subscribe 하는 경로를 value 로 작성해준다. 특정 사용자가 @MessageMapping 내의 주소로 publish 할 경우 그 밑에 @SendTo 내에 들어있는 경로에 Subscribe 하는 사용자들에게 해당 메시지를 보내는 것이다. 아래와 같이 간단한 index.html 파일을 작성하여 테스트를 진행해볼 수 있다.

 

 

<html>
<head>
    <title>Chat WebSocket</title>
    <script type="text/javascript" src="https://unpkg.com/kd-shim-sockjs-client@0.3.4/sockjs-0.3.4.js"></script>
    <script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
    <script type="text/javascript">
        let stompClient = null;

        function setConnected(connected) {
            document.getElementById('connect').disabled = connected;
            document.getElementById('disconnect').disabled = !connected;
            document.getElementById('conversationDiv').style.visibility
                = connected ? 'visible' : 'hidden';
            document.getElementById('response').innerHTML = '';
        }

        function connect() {
            const socket = new SockJS('/chat');
            stompClient = Stomp.over(socket);
            stompClient.connect({}, function(frame) {
                setConnected(true);
                console.log('Connected: ' + frame);
                stompClient.subscribe('/queue/message/wonil',
                    function(messageOutput) {
                    showMessageOutput(JSON.parse(messageOutput.body));
                });
            });
        }

        function disconnect() {
            if(stompClient != null) {
                stompClient.disconnect();
            }
            setConnected(false);
            console.log("Disconnected");
        }

        function sendMessage() {
            const from = document.getElementById('from').value;
            const text = document.getElementById('text').value;
            const a = stompClient.send("/app/queue/wonil", {}, JSON.stringify({'from': from, 'text': text}));
            console.log(a)
        }

        function showMessageOutput(messageOutput) {
            const response = document.getElementById('response');
            const p = document.createElement('p');
            p.style.wordWrap = 'break-word';
            p.appendChild(document.createTextNode(messageOutput.from + ": "
                + messageOutput.text + " (" + messageOutput.time + ")"));
            response.appendChild(p);
        }
    </script>
</head>
<body onload="disconnect()">
<div>
    <div>
        <input type="text" id="from" placeholder="Choose a nickname"/>
    </div>
    <br />
    <div>
        <button id="connect" onclick="connect();">Connect</button>
        <button id="disconnect" disabled="disabled" onclick="disconnect();">
            Disconnect
        </button>
    </div>
    <br />
    <div id="conversationDiv">
        <input type="text" id="text" placeholder="Write a message..."/>
        <button id="sendMessage" onclick="sendMessage();">Send</button>
        <p id="response"></p>
    </div>
</div>

</body>
</html>

 

 

 

 

 

 

 

 

 

Reference:

https://www.egovframe.go.kr/wiki/doku.php?id=egovframework:rte3.9:ptl:stomp 

https://aws.amazon.com/ko/what-is/pub-sub-messaging/

https://www.ibm.com/kr-ko/topics/message-brokers

https://hyeooona825.tistory.com/89

'Backend > Spring' 카테고리의 다른 글

Spring WebSocket (STOMP - Kafka)  (0) 2023.07.10
Spring WebSocket (STOMP - Redis)  (0) 2023.07.01
Spring Security : JWT 적용해보기 (3)  (0) 2023.06.29
Spring WebSocket  (0) 2023.06.28
Spring Security : JWT 적용해보기 (2)  (0) 2023.06.11