[Java] Cassandra(nosql)를 사용하는 방법


Development note/Java  2020. 6. 8. 18:06

안녕하세요. 명월입니다.


이 글은 Java에서 cassandra(nosql)를 사용하는 방법에 대한 글입니다.


이전에 Linux에서 Cassandra를 설치하는 방법에 대해 설명한 적이 있습니다.

링크 - [CentOS] Linux환경에서 Cassandra(NoSql DB)를 설치하는 방법(DBeaver로 접속하기)


Java 프로그램 안에서 Cassandra에 접속해서 데이터를 취득 및 삽입, 삭제하겠습니다.

Cassandra를 사용하기 위해서 라이브러리를 업데이트해야 합니다.

레포지토리 - https://mvnrepository.com/artifact/com.datastax.oss/java-driver-core/4.6.1

레포지토리 - https://mvnrepository.com/artifact/com.datastax.oss/java-driver-query-builder/4.6.1

레포지토리 - https://mvnrepository.com/artifact/com.datastax.oss/java-driver-mapper-runtime/4.6.1


먼저 기본 접속 소스 입니다.

import java.io.Closeable;
import java.net.InetSocketAddress;
import java.util.List;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
// try로 자동 리소스 반환(close)를 사용하기 위해 Closeable 인터페이스 상속
class CassandraConnector implements Closeable {
  // Cql 쿼리를 사용하기 위한 CqlSession
  private CqlSession session;

  // 생성자 파라미터 접속 ip, port, datacenter, id, pw
  public CassandraConnector(String ip, Integer port, String datacenter, String id, String pw) {
    // builder를 생성
    CqlSessionBuilder b = CqlSession.builder();
    // 접속 포인트 설정
    b.addContactPoint(new InetSocketAddress(ip, port));
    // 아이디, 패스워드 설정
    b.withAuthCredentials(id, pw);
    // 데이터센터 설정
    b.withLocalDatacenter(datacenter);
    // 세션 생성
    session = b.build();
  }
  // 세션 생성
  public CqlSession getSession() {
    return this.session;
  }
  // 세션 닫기
  @Override
  public void close() {
    session.close();
  }
}
public class Example {
  // 실행 함수
  public static void main(String[] args) {
    // cassandra 접속 (try 영역을 벗어나면 자동 close)
    try (CassandraConnector client = new CassandraConnector("192.168.1.200", 9042, "datacenter1", "nowonbun", "a12345")) {
      // 세션 취득
      CqlSession session = client.getSession();
      // 쿼리 검색
      ResultSet result = session.execute("select * from test.test");
      // 검색 결과
      List<Row> list = result.all();
      // 콘솔 출력
      for (Row row : list) {
        System.out.println(row.getObject(0));
        System.out.println(row.getObject(1));
      }
    }
  }
}

여기서 datacenter의 값은 cassandra 서버에서 nodetool status로 확인할 수 있습니다.

예전에 제가 test 테이블에 hello world 데이터를 넣어 놓은게 있습니다. select로 검색하겠습니다.

여기서 session을 취득하고 execute함수로 insert, update, delete를 해도 상관은 없습니다.

import java.io.Closeable;
import java.net.InetSocketAddress;
import java.util.List;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
// try로 자동 리소스 반환(close)를 사용하기 위해 Closeable 인터페이스 상속
class CassandraConnector implements Closeable {
  // Cql 쿼리를 사용하기 위한 CqlSession
  private CqlSession session;
  // 생성자 파라미터 접속 ip, port, datacenter, id, pw
  public CassandraConnector(String ip, Integer port, String datacenter, String id, String pw) {
    // builder를 생성
    CqlSessionBuilder b = CqlSession.builder();
    // 접속 포인트 설정
    b.addContactPoint(new InetSocketAddress(ip, port));
    // 아이디, 패스워드 설정
    b.withAuthCredentials(id, pw);
    // 데이터센터 설정
    b.withLocalDatacenter(datacenter);
    // 세션 생성
    session = b.build();
  }
  // 세션 생성
  public CqlSession getSession() {
    return this.session;
  }
  // 세션 닫기
  @Override
  public void close() {
    session.close();
  }
}
public class Example {
  // 실행 함수
  public static void main(String[] args) {
    int idx = 1;
    String contents = "hello world";
    // cassandra 접속 (try 영역을 벗어나면 자동 close)
    try (CassandraConnector client = new CassandraConnector("192.168.1.200", 9042, "datacenter1", "nowonbun", "a12345")) {
      // 세션 취득
      CqlSession session = client.getSession();
      // 테이블 생성
      session.execute("create table test.test1 (idx bigint,contents text,primary key(idx))");
      // 데이터 입력
      session.execute("insert into test.test1(idx,contents)values(" + idx + ",'" + contents + "')");
      // 쿼리 검색
      ResultSet result = session.execute("select * from test.test1");
      // 검색 결과
      List<Row> list = result.all();
      // 콘솔 출력
      for (Row row : list) {
        System.out.println(row.getObject(0));
        System.out.println(row.getObject(1));
      }
    }
  }
}

그런데 다 String으로 쿼리를 처리하게 되면 Query에서 에러가 있을 경우 채크가 되지 않고 쿼리 상으로 클래스를 관리하게 되면 클래스의 쿼리를 관리하기도 어렵습니다. 예를 들면 테이블 구조가 바뀐다던지... 테이블 이름이 바뀐다던지...

import java.io.Closeable;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.DriverTimeoutException;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
import com.datastax.oss.driver.api.querybuilder.delete.Delete;
import com.datastax.oss.driver.api.querybuilder.insert.InsertInto;
import com.datastax.oss.driver.api.querybuilder.insert.RegularInsert;
import com.datastax.oss.driver.api.querybuilder.schema.CreateKeyspace;
import com.datastax.oss.driver.api.querybuilder.schema.CreateTable;
import com.datastax.oss.driver.api.querybuilder.schema.Drop;
import com.datastax.oss.driver.api.querybuilder.select.Select;
// try로 자동 리소스 반환(close)를 사용하기 위해 Closeable 인터페이스 상속
class CassandraConnector implements Closeable {
  // Cql 쿼리를 사용하기 위한 CqlSession
  private CqlSession session;

  // 생성자 파라미터 접속 ip, port, datacenter, id, pw
  public CassandraConnector(String ip, Integer port, String datacenter, String id, String pw) {
    // builder를 생성
    CqlSessionBuilder b = CqlSession.builder();
    // 접속 포인트 설정
    b.addContactPoint(new InetSocketAddress(ip, port));
    // 아이디, 패스워드 설정
    b.withAuthCredentials(id, pw);
    // 데이터센터 설정
    b.withLocalDatacenter(datacenter);
    // 세션 생성
    session = b.build();
  }
  // 세션 생성
  public CqlSession getSession() {
    return this.session;
  }
  // 세션 닫기
  @Override
  public void close() {
    session.close();
  }
}
// 카산드라 상위 테이블
class CassandraTable {
  // KeySpace 설정
  private final String keySpaceName = "test";
  // 세션 설정
  private final CqlSession session;
  // 세션 설정
  public CassandraTable(CqlSession session) {
    this.session = session;
  }
  // KeySpace 생성
  protected void createKeySpace(String keyspaceName, int numberOfReplicas) {
    // keyspace 설정
    CreateKeyspace query = SchemaBuilder.createKeyspace(keyspaceName)
                                        .ifNotExists() // 있으면 생략
                                        .withSimpleStrategy(numberOfReplicas);
    // 쿼리 실행
    session.execute(query.build());
  }
  // KeySpace 선택
  protected void useKeyspace(String keyspace) {
    session.execute("USE " + keyspace);
  }
  // Statement로 쿼리 실행
  protected ResultSet executeStatement(SimpleStatement statement) {
    // KeySpace 선택
    useKeyspace(keySpaceName);
    try {
      // 실행
      return session.execute(statement);
    } catch (DriverTimeoutException e) {
      return null;
    }
  }
  // String 쿼리 실행
  protected ResultSet execute(String query) {
    // KeySpace 선택
    useKeyspace(keySpaceName);
    try {
      // 실행
      return session.execute(query);
    } catch (DriverTimeoutException e) {
      return null;
    }
  }
  // 세션 취득
  protected CqlSession getSession() {
    return this.session;
  }
}
// Test1 테이블
class Test1Table extends CassandraTable {
  // 생성자로 세션 설정
  public Test1Table(CqlSession session) {
    super(session);
  }
  // 테이블 생성
  protected void createTable() {
    // 쿼리 생성
    CreateTable createTable = SchemaBuilder.createTable("Test1")
                                           .ifNotExists() // 없으면 실행
                                           .withPartitionKey("idx", DataTypes.INT) // idx 컬럼 설정
                                           .withColumn("contents", DataTypes.TEXT); // contents 컬럼 설정
    // 쿼리 실행
    executeStatement(createTable.build());
  }
  // 테이블 삭제
  protected void dropTable() {
    // 쿼리 생성
    Drop dropTable = SchemaBuilder.dropTable("Test1")
                                  .ifExists(); // 있으면 실행
    // 쿼리 실행
    executeStatement(dropTable.build());
  }
  // idx의 최대 값 취득
  public int getMaxIdx() {
    // String 쿼리로 최대값 취득
    ResultSet resultSet = execute("select max(idx) from test.Test1");
    // 컬럼은 한개
    Row row = resultSet.one();
    if (row == null) {
      // 없으면 0
      return 0;
    }
    // 있으면 최대값 리턴
    return row.getInt(0);
  }
  // 데이터 insert
  public void insertData(String contents) {
    // 최대 idx 값 취득
    int idx = getMaxIdx();
    // 데이터 insert
    this.insertData(idx + 1, contents);
  }
  // 데이터 insert
  public void insertData(int idx, String contents) {
    // 쿼리 생성
    InsertInto into = QueryBuilder.insertInto("Test1");
    // 쿼리에서 idx와 contents는 바인딩 설정
    RegularInsert insert = into.value("idx", QueryBuilder.bindMarker())
                               .value("contents", QueryBuilder.bindMarker());
    // 바인딩 설정
    BoundStatement statement = getSession().prepare(insert.build())
                                           .bind()
                                           .setInt(0, idx) // idx 컬럼 설정
                                           .setString(1, contents); // contents 설정
    // 쿼리 실행
    getSession().execute(statement);
  }
  // 검색
  public List<Test1Row> selectAll() {
    // 검색 쿼리
    Select select = QueryBuilder.selectFrom("Test1").all();
    // 값을 취득
    ResultSet resultSet = executeStatement(select.build());
    // Test1Row 클래스로 생성
    List<Test1Row> result = new ArrayList<>();
    // 리스트에 값 설정
    resultSet.forEach(x -> result.add(new Test1Row(x.getInt(0), x.getString(1))));
    // 결과 리스트 리턴
    return result;
  }
  // 데이터 삭제
  public void deleteData(int idx) {
    // 쿼리 생성
    Delete delete = QueryBuilder.deleteFrom("Test1")
                                // idx 컬럼
                                .whereColumn("idx")
                                // 비교한다.
                                .isEqualTo(QueryBuilder.bindMarker());
    // 쿼리 실행
    executeStatement(delete.build(idx));
  }
}
// Test1 테이블
class Test1Row {
  // idx 컬럼 
  private int idx;
  // contents 컬럼
  private String contents;
  // 생성자
  public Test1Row(int idx, String contents) {
    this.idx = idx;
    this.contents = contents;
  }
  public int getIdx() {
    return idx;
  }
  public void setIdx(int idx) {
    this.idx = idx;
  }
  public String getContents() {
    return contents;
  }
  public void setContents(String contents) {
    this.contents = contents;
  }
}
public class Example {
  // 실행 함수
  public static void main(String[] args) {
    // cassandra 접속 (try 영역을 벗어나면 자동 close)
    try (CassandraConnector client = new CassandraConnector("192.168.1.200", 9042, "datacenter1", "nowonbun", "a12345")) {
      // Test1 테이블 인스턴스 생성
      Test1Table test1 = new Test1Table(client.getSession());
      // 테이블 삭제
      test1.dropTable();
      // 테이블 생성
      test1.createTable();
      // 데이터 추가
      test1.insertData("hello world");
      // 취득하기
      List<Test1Row> list = test1.selectAll();
      // 콘솔 출력
      for (Test1Row row : list) {
        System.out.println("idx - " + row.getIdx() + " contents - " + row.getContents());
      }
    } catch (Throwable e) {
      e.printStackTrace();
    }
  }
}

Nosql(Cassandra)는 ORM 프레임 워크가 없는게 참 아쉽네요... 시간이 된다면 제가 ORM 프레임 워크를 작성하고 싶습니다.


참조 - https://docs.datastax.com/en/developer/java-driver/4.6/manual/query_builder/

참조 - https://www.baeldung.com/cassandra-datastax-java-driver


여기까지 Java에서 cassandra(nosql)를 사용하는 방법에 대한 글이었습니다.


궁금한 점이나 잘못된 점이 있으면 댓글 부탁드립니다.