elevne's Study Note

Spring WebSocket (STOMP - Kafka) 본문

Backend/Spring

Spring WebSocket (STOMP - Kafka)

elevne 2023. 7. 10. 21:25

Apache Kafka 는 빠르고 확장 가능한 데이터 피드의 분산 스트리밍, 파이프라이닝 및 재생을 위한 실시간 스트리밍 데이터 처리 목적으로 설계된 오픈 소스 분산형 pub/sub 메시징 플랫폼이다. Kafka 는 서버 클러스터 내에서 데이터 스트림을 레코드로 유지하는 방식으로 작동하는 브로커 기반 솔루션이다. Kafka 서버는 여러 데이터 센터에 분산되어 있을 수 있으며 여러 서비스 인스턴스에 걸쳐 레코드 스트림(메시지)을 토픽으로 저장하여 데이터 지속성을 제공할 수 있다.

 

 

Kafka

 

 

 

Kafka 는 Producer/Consumer 개념을 사용한다. Publisher/Subscriber 에 각각 매칭된다고 생각하면 된다. Producer 이 Topic 에 이벤트를 보내면 해당 이벤트는 Topic 의 각 Partition 에 분산되어 저장된다. Topic 을 구독하고 있는 Consumer Group 내의 Consumer 은 각각 1 개 이상의 Partition 으로부터 이벤트를 가져온다.

 

 

 

반대로 이전 시간에 사용했던 Redis 는 그룹이라는 개념이 없고, 각 Subscriber 이 channel 을 구독하고 있다. 이 때 channel 은 이벤트를 따로 저장하지 않는다. 만일 channel 에 이벤트가 도착했을 때 해당 채널의 Subscriber 이 없다면 이벤트는 사라진다.

 

 

 

Kafka 보다 Redis 가 속도 면에서 우월할 수 있다고 한다. 다만, Redis 는 데이터 유실의 문제가 발생할 수 있다. Kafka 는 대량의 데이터를 다룰 때 적합하며 확장성이 좋다는 장점이 있지만, Redis 에 비해 상대적으로 학습하기 어렵다는 점이 단점이다. 실시간 채팅 기능을 구현하기 위해서 둘 중 어떤 것을 사용해야하는지는 딱 잘라 말할 수 없는 것 같다. 사용 사례, 확장 요구 사항, 운영 및 관리 상황 등에 따라 유동적으로 결정해야하는 것 같다.

 

(Kakao 의 Alex 사례를 보면 Kafka 와 Redis 를 혼합하여 사용하기도 한다. https://tech.kakao.com/2020/06/08/websocket-part1/ )

 

 

 

이번에는 Spring Boot 와 Kafka 를 연동해보았다. (Kafka 는 Docker 로 설치했다) build.gradle 에 아래 dependency 를 추가해준다.

 

 

implementation 'org.springframework.kafka:spring-kafka'

 

 

 

그 다음, 먼저 Producer 에 대한 Configuration 을 작성한다.

 

 

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    public Map<String, Object> producerConfig() {
        HashMap<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }
}

 

 

ProducerFactory 타입의 빈을 만들고(이 때 Map 객체를 넣어 설정을 넣어줄 수 있다), 이를 사용하여 KafkaTemplate 빈을 생성한다. KafkaTemplate 은 Kafka 와 high-level 작업들을 수행하기 위한 템플릿으로, thread-safe 하다. 위 템플릿 빈은 아래와 같이 사용할 수 있다.

 

 

@Component
@RequiredArgsConstructor
public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;
    private final ObjectMapper objectMapper;

    public void send(String topic, Message msg) throws JsonProcessingException {
        String data = objectMapper.writeValueAsString(msg);
        kafkaTemplate.send(topic, data);
    }

}

 

 

KafkaTemplatesend 메소드를 통해 원하느 토픽으로 데이터를 전송할 수 있다.

 

 

 

그 다음으로는 Consumer 을 정의한다.

 

 

@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    public Map<String, Object> consumerConfig() {
        HashMap<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfig());
    }

    @Bean(name = "factory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
        factory(ConsumerFactory<String, String> consumerFactory)
    {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        return factory;
    }
}

 

 

위 코드 또한 마찬가지로 Map 으로 설정을 넣어 ConsumerFactory 빈을 만든다. 그 다음, ConcurrentKafkaListenerContainerFactory 객체에 setConsumerFactory 를 한 빈을 정의한다. 이는 아래와 같이 사용된다.

 

 

@Component
@RequiredArgsConstructor
public class KafkaListeners {

    private final SimpMessageSendingOperations messageSending;
    private final ObjectMapper objectMapper;
    @KafkaListener(topics = "kafkatopic", groupId = "groupId",  containerFactory = "factory")
    void listener(String data) {
        try {
            Message msg = objectMapper.readValue(data, Message.class);
            messageSending.convertAndSend("/queue/message/wonil", data);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 

 

@KafkaListener 애노테이션 내에서 메시지리스너를 만들 때 사용할 KafkaListenerContainerFactory 객체의 빈 이름을 적어준다. (factory) 메소드 위에 이 애노테이션을 붙이고, 해당 클래스를 컴포넌트로 만들어주면 된다. 마지막으로 이전에 작성한 ChatController 에 아래 메소드를 추가했다.

 

 

@MessageMapping("/queue/kafka/wonil")
public void messageKafka(@Payload Message message) throws JsonProcessingException {
    System.out.println("KAFKA CONTROLLER");
    kafkaProducer.send("kafkatopic", message);
}

 

 

 

위 코드를 전부 작성한 뒤, 실시간 채팅을 테스트 해볼 수 있었다.

 

 

result

 

 

 

 

 

 

 

 

 

Reference:

https://www.tibco.com/ko/reference-center/what-is-apache-kafka

https://medium.com/frientrip/pub-sub-%EC%9E%98-%EC%95%8C%EA%B3%A0-%EC%93%B0%EC%9E%90-de9dc1b9f739 

'Backend > Spring' 카테고리의 다른 글

Spring Test Code 작성하기  (0) 2023.08.01
Spring Boot Logback  (0) 2023.07.25
Spring WebSocket (STOMP - Redis)  (0) 2023.07.01
Spring WebSocket (STOMP)  (0) 2023.06.30
Spring Security : JWT 적용해보기 (3)  (0) 2023.06.29