elevne's Study Note

Logstash 활용 본문

ETC/Elasticsearch

Logstash 활용

elevne 2022. 12. 23. 20:41

Elasticsearch 와 함께 활용되는 Logstash 는 모든 크기, 형태, 소스의 데이터 수집을 진행할 수 있다고 한다. Logstash 에는 200 개 이상의 플러그인 프레임워크가 있어 다양한 입력, 필터, 출력을 자유롭게 조절할 수 있다. Logstash 를 다양하게 활용할 수 있지만, 그 기능 중에서 Elasticsearch 에 RDBMS 에 있는 데이터를 옮기는 데 사용하였다.

 

 

 

 

 

 

이전 글에서 한 테이블의 데이터를 Elasticsearch 에 Indexing 하는 법을 간단히 알아보았다. 그런데 실제로 프로젝트를 진행하다 보니, Elasticsearch 에서 여러 개의 Index 를 다뤄야 했고, 하나의 Logstash 프로그램으로 여러 개의 Index 에 Indexing 하는 법을 알아봐야 했다. 이를 위해 아래와 같은 포맷에 맞춰 Conf 파일을 작성했다.

 

 

 

input {
  jdbc {
    # JDBC connection settings
    jdbc_driver_library => "/path/to/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydatabase"
    jdbc_user => "user"
    jdbc_password => "password"

    # Polling settings
    schedule => "* * * * *"
    statement => "SELECT * FROM table1"
  }

  jdbc {
    # JDBC connection settings
    jdbc_driver_library => "/path/to/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydatabase"
    jdbc_user => "user"
    jdbc_password => "password"

    # Polling settings
    schedule => "* * * * *"
    statement => "SELECT * FROM table2"
  }
}

output {
  elasticsearch {
    # Elasticsearch connection settings
    hosts => ["http://localhost:9200"]
    index => "index1"
  }

  elasticsearch {
    # Elasticsearch connection settings
    hosts => ["http://localhost:9200"]
    index => "index2"
  }
}

 

 

 

위처럼 하나의 conf 파일에 여러 개를 순서대로 작성해주면 된다. input 의 jdbc, output 의 elasticsearch 를 순서대로 작성하는 것이다. 

 

 

 

RDBMS 의 데이터를 Elasticsearch 로 가져올 때, Elasticsearch 의 "_id" 필드는 DB 의 "id", Primary Key 필드로 설정되어야 한다. 위에는 적어주지 않았지만, elasticsearch { } 내에 document_id = "%{id}" 와 같이 적어주어야 하는 것이다. 이는 DB 의 레코드와 Elasticsearch 문서 간에 직접 매핑을 제공해준다. 레코드가 DB 에서 업데이트되는 경우, 연결된 문서 전체가 Elasticsearch 에서 덮어쓰기된다. Elasticsearch 에서 문서를 덮어쓰는 것은 밑에서 다룰 업데이트 작업만큼은 효율적이지 않다고 한다. (내부적으로 업데이트는 구 문서를 삭제한 다음 새 문서 전체를 색인하는 작업으로 이루어지기 때문이다.)

 

 

 

DB 에 레코드가 삽입되거나 업데이트되면, 그 레코드는 업데이트 혹은 삽입 시간을 포함하는 필드를 가져야한다. 그 필드는 Logstash 가 그 Pooling 루프의 마지막 반복 계산 이래 수정되거나 삽입된 문서만 요청할 수 있도록 하는데 이용한다. Logstash 가 DB 에서 데이터를 풀링할 때마다 이것은 DB 에서 읽은 마지막 레코드의 업데이트 혹은 삽입 시간을 저장한다. 그 다음 번 반복 계산 시에 Logstash 는 풀링 루프의 이전 반복 계산에서 받았던 마지막 기록보다 새로운 업데이트나 삽입이 있는 레코드만 요청해야 한다는 것을 알고 있는 것이다.

 

 

 

statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE 
(UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) 
ORDER BY modification_time ASC"

 

 

 

위와 같이 modification_time 이라는 마지막 변경 시간을 저장한 값과, :sql_last_value (Logstash의 폴링 루프의 현재 반복 계산을 위한 시작점을 포함하는 기본 제공 매개변수이다)를 비교하며 데이터를 참조하는 것이다.

 

 

 

그런데 나의 경우에는 달랐던 것이 위 Conf 파일의 statement 에 해당하는 부분, 즉 데이터를 불러오는 쿼리가 일반적인 경우인 하나의 테이블에서 그 테이블의 정보들만을 가져오는 것이 아니라, 여러 개의 테이블을 JOIN 하며 참조하는 쿼리를 실행시키는 statement 를 사용하였다. 그래서 위에 작성한 것처럼, 직접 전부 다 다시 매핑하는 방법을 사용하였다. 이를 위해서는 위에서 언급한 것처럼 document_id 를 지정해주어야 한다. 아래와 같이 작성한다.

 

 

 

output {
  elasticsearch {
    # Elasticsearch connection settings
    hosts => ["http://localhost:9200"]
    index => "index1"
    document_id => "%{id_field}"
  }
}

 

 

 

기존에 Index 에 저장되어 있는 document 들의 document_id 와 비교하여, 새로 불러오는 데이터 중 같은 document_id 가 있는 가존의 document 들은 삭제하고 새로운 데이터를 저장하는 것이다. (기존의 것과 id 가 겹치지 않는 데이터는 새롭게 추가하는 것.) 위 모든 과정에서 tracking_column, document_id 등에 컬럼 명을 작성해줄 때 모두 소문자로 작성해주지 않으면 원하는 대로 작동하지 않는다!

 

 

 

Reference:

https://www.elastic.co/kr/blog/how-to-keep-elasticsearch-synchronized-with-a-relational-database-using-logstash

'ETC > Elasticsearch' 카테고리의 다른 글

Elasticsearch on Java Spring (2)  (0) 2022.10.27
Elasticsearch on Java Spring + (Search Queries...)  (0) 2022.10.26
Elasticsearch 정리!  (0) 2022.10.24