elevne's Study Note
Spring WebSocket (STOMP - Kafka) 본문
Apache Kafka 는 빠르고 확장 가능한 데이터 피드의 분산 스트리밍, 파이프라이닝 및 재생을 위한 실시간 스트리밍 데이터 처리 목적으로 설계된 오픈 소스 분산형 pub/sub 메시징 플랫폼이다. Kafka 는 서버 클러스터 내에서 데이터 스트림을 레코드로 유지하는 방식으로 작동하는 브로커 기반 솔루션이다. Kafka 서버는 여러 데이터 센터에 분산되어 있을 수 있으며 여러 서비스 인스턴스에 걸쳐 레코드 스트림(메시지)을 토픽으로 저장하여 데이터 지속성을 제공할 수 있다.
Kafka 는 Producer/Consumer 개념을 사용한다. Publisher/Subscriber 에 각각 매칭된다고 생각하면 된다. Producer 이 Topic 에 이벤트를 보내면 해당 이벤트는 Topic 의 각 Partition 에 분산되어 저장된다. Topic 을 구독하고 있는 Consumer Group 내의 Consumer 은 각각 1 개 이상의 Partition 으로부터 이벤트를 가져온다.
반대로 이전 시간에 사용했던 Redis 는 그룹이라는 개념이 없고, 각 Subscriber 이 channel 을 구독하고 있다. 이 때 channel 은 이벤트를 따로 저장하지 않는다. 만일 channel 에 이벤트가 도착했을 때 해당 채널의 Subscriber 이 없다면 이벤트는 사라진다.
Kafka 보다 Redis 가 속도 면에서 우월할 수 있다고 한다. 다만, Redis 는 데이터 유실의 문제가 발생할 수 있다. Kafka 는 대량의 데이터를 다룰 때 적합하며 확장성이 좋다는 장점이 있지만, Redis 에 비해 상대적으로 학습하기 어렵다는 점이 단점이다. 실시간 채팅 기능을 구현하기 위해서 둘 중 어떤 것을 사용해야하는지는 딱 잘라 말할 수 없는 것 같다. 사용 사례, 확장 요구 사항, 운영 및 관리 상황 등에 따라 유동적으로 결정해야하는 것 같다.
(Kakao 의 Alex 사례를 보면 Kafka 와 Redis 를 혼합하여 사용하기도 한다. https://tech.kakao.com/2020/06/08/websocket-part1/ )
이번에는 Spring Boot 와 Kafka 를 연동해보았다. (Kafka 는 Docker 로 설치했다) build.gradle 에 아래 dependency 를 추가해준다.
implementation 'org.springframework.kafka:spring-kafka'
그 다음, 먼저 Producer 에 대한 Configuration 을 작성한다.
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
public Map<String, Object> producerConfig() {
HashMap<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfig());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
}
ProducerFactory 타입의 빈을 만들고(이 때 Map 객체를 넣어 설정을 넣어줄 수 있다), 이를 사용하여 KafkaTemplate 빈을 생성한다. KafkaTemplate 은 Kafka 와 high-level 작업들을 수행하기 위한 템플릿으로, thread-safe 하다. 위 템플릿 빈은 아래와 같이 사용할 수 있다.
@Component
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
public void send(String topic, Message msg) throws JsonProcessingException {
String data = objectMapper.writeValueAsString(msg);
kafkaTemplate.send(topic, data);
}
}
KafkaTemplate 의 send 메소드를 통해 원하느 토픽으로 데이터를 전송할 수 있다.
그 다음으로는 Consumer 을 정의한다.
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
public Map<String, Object> consumerConfig() {
HashMap<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfig());
}
@Bean(name = "factory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
factory(ConsumerFactory<String, String> consumerFactory)
{
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
}
위 코드 또한 마찬가지로 Map 으로 설정을 넣어 ConsumerFactory 빈을 만든다. 그 다음, ConcurrentKafkaListenerContainerFactory 객체에 setConsumerFactory 를 한 빈을 정의한다. 이는 아래와 같이 사용된다.
@Component
@RequiredArgsConstructor
public class KafkaListeners {
private final SimpMessageSendingOperations messageSending;
private final ObjectMapper objectMapper;
@KafkaListener(topics = "kafkatopic", groupId = "groupId", containerFactory = "factory")
void listener(String data) {
try {
Message msg = objectMapper.readValue(data, Message.class);
messageSending.convertAndSend("/queue/message/wonil", data);
} catch (Exception e) {
e.printStackTrace();
}
}
}
@KafkaListener 애노테이션 내에서 메시지리스너를 만들 때 사용할 KafkaListenerContainerFactory 객체의 빈 이름을 적어준다. (factory) 메소드 위에 이 애노테이션을 붙이고, 해당 클래스를 컴포넌트로 만들어주면 된다. 마지막으로 이전에 작성한 ChatController 에 아래 메소드를 추가했다.
@MessageMapping("/queue/kafka/wonil")
public void messageKafka(@Payload Message message) throws JsonProcessingException {
System.out.println("KAFKA CONTROLLER");
kafkaProducer.send("kafkatopic", message);
}
위 코드를 전부 작성한 뒤, 실시간 채팅을 테스트 해볼 수 있었다.
Reference:
https://www.tibco.com/ko/reference-center/what-is-apache-kafka
https://medium.com/frientrip/pub-sub-%EC%9E%98-%EC%95%8C%EA%B3%A0-%EC%93%B0%EC%9E%90-de9dc1b9f739
'Backend > Spring' 카테고리의 다른 글
Spring Test Code 작성하기 (0) | 2023.08.01 |
---|---|
Spring Boot Logback (0) | 2023.07.25 |
Spring WebSocket (STOMP - Redis) (0) | 2023.07.01 |
Spring WebSocket (STOMP) (0) | 2023.06.30 |
Spring Security : JWT 적용해보기 (3) (0) | 2023.06.29 |