[Java] NIO(Non-Blocking IO) Socket 통신


Development note/Java  2020. 2. 4. 09:00

안녕하세요. 명월입니다.


이 글은 Java에서 NIO Socket 통신에 대한 글입니다.


예전에 Java환경에서 소켓 통신을 하는 방법에 대한 글을 작성한 적이 있습니다.

링크 - [Java강좌 - 23] 소켓 통신 (Socket)


이전에 소켓 방식에서 다중 접속을 하려고 한다면 server.accept에서 무한 loop를 걸고 accept되는 시점에서 Socket를 받아서 OutputStream과 InputStream을 받아서 또 다른 쓰레드를 만들어 Socket의 접속 상태를 유지해야 합니다.

import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Server {
  private static char CR = (char) 0x0D;
  private static char LF = (char) 0x0A;

  public static void main(String... args) {
    // 싱글 쓰레드 풀
    Executors.newSingleThreadExecutor().execute(() -> {
      // Server 소켓 생성
      try (ServerSocket server = new ServerSocket()) {
        // 서버 대기 (Listen)할 포트 설정
        server.bind(new InetSocketAddress(10000));
        // 다중 접속을 위한 다중 쓰레드 풀 생성
        ExecutorService service = Executors.newCachedThreadPool();
        while (true) {
          // Client 접속
          final Socket client = server.accept();
          // 다중 쓰레드 풀에 client을 넣는다. 그리고 위 accept를 다시 대기해야지
          service.execute(() -> {
            // client로 메시지를 전송할 sender와 메시지를 받을 receiver를 
            try (final OutputStream sender = client.getOutputStream();
               final InputStream receiver = client.getInputStream();) {
              // 메시지를 모으기 위한 버퍼
              StringBuffer sb = new StringBuffer();
              // 메시지 발송
              Runnable send = () -> {
                try {
                  // 버퍼로 부터 데이터를 가져와서 byte형식으로 변환.
                  byte[] data = sb.toString().getBytes();
                  // 메일 발송.
                  sender.write(data, 0, data.length);
                  sb.setLength(0);
                } catch (Throwable e) {
                  e.printStackTrace();
                }
              };
              sb.append("Welcome server!\r\n>");
              // welcome 메시지 발송
              send.run();
              while (true) {
                // 수신 대기 버퍼
                byte[] data = new byte[1024];
                // 메시지를 수신한다.
                receiver.read(data, 0, data.length);
                // 버퍼의 빈 값(\0)을 제거
                String buffer = new String(data).replace("\0", "");
                sb.append(buffer);
                // 메시지 끝이 개행일 경우
                if (sb.length() > 2 && sb.charAt(sb.length() - 2) == CR
                    && sb.charAt(sb.length() - 1) == LF) {
                  // 개행 제거
                  sb.setLength(sb.length() - 2);
                  // 버퍼를 string으로 변환
                  String msg = sb.toString();
                  // 콘솔에 표시
                  System.out.println(msg);
                  // exit경우 접속 정료
                  if ("exit".equals(msg)) {
                    break;
                  }
                  // echo 메시지 만들기
                  sb.insert(0, "Echo - ");
                  sb.append("\r\n>");
                  // 에코 메시지 송신
                  send.run();
                }
              }
            } catch (Throwable e) {
              e.printStackTrace();
            } finally {
              // 서버 종료
              try {
                client.close();
              } catch (Throwable e1) {
                e1.printStackTrace();
              }
            }
          });
        }
      } catch (Throwable e) {
        e.printStackTrace();
      }
    });
  }
}

위 예제는 제가 ServerSocket으로 만든 java 소켓 통신 예제입니다.

일단 간단한 예제를 위해서 main함수에 작성을 헀는데, 여기서 확인해야 할 사항은 accept를 한 후에 다중 스레드에 OutputStream과 InputStream을 선언한 스레드를 만들었습니다.

만약 클라이언트가 한 두개의 접속이라면 크게 부담되지 않습니다만, 많은 접속이 발생할 경우 접속할 때마다 쓰레드가 무한이 증가하는 형식입니다. 쓰레드 자체는 꽤 많은 리소스를 잡아먹기 때문에 접속이 많아 진다면 서버가 느려지는 현상이 발생할 수도 있습니다.


그래서 이런 부담을 줄여주기 위해 MS에서는 IOCP를 지원합니다.

링크 - [C#] 비동기 소켓 통신(IOCP) - EAP 패턴

링크 - C#] 비동기 소켓 통신(IOCP) - APM 패턴

IOCP는 간단하게 설명하면 모든 접속의 accept을 IOCP의 큐에 집어넣어서 Client의 요청이 있을 시에 큐에서 꺼내어 처리하는 방식입니다.


아쉽게도 java에서는 IOCP기능이 없습니다. IOCP는 Window OS전용입니다.

그러나 java에서는 NIO기능이라 하여 Non-blocking I/O를 지원합니다.

NIO기능은 IOCP와는 개념이 완전 다릅니다. 보통 socket에서는 accept를 실행하면 Block이 되어 버립니다. 접속을 기다리는 형태가 되는 것입니다.

그렇기 때문에 접속을 할 때마다 Thread에 Stream를 넣어서 accept의 block를 피하는 것입니다.

Channel이라는 클래스에서 accpet, connect, send, receive의 type의 selector를 만들어서 대기하는 형태입니다.


솔직히 제가 이런 이론적인 설명은 100% 정확하게 잘 모릅니다. 이렇다고 하네요. 개인적으로 그게 그거 아닌가 싶기도 하고.. IOCP와 NIO는 개념이 다르다고는 하는데..

링크 - https://en.wikipedia.org/wiki/Non-blocking_I/O_(Java)

결론은 그래서 NIO를 사용하게 되면 저 Client 접속 단위로 Thread 생성이 없어지기 때문에 성능이 많이 올라갑니다.

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.Executors;

public class Server implements Runnable {

  private InetSocketAddress listenAddress;
  // 메시지는 개행으로 구분한다.
  private static char CR = (char) 0x0D;
  private static char LF = (char) 0x0A;
  // ip와 port 설정
  public Server(String address, int port) {
    listenAddress = new InetSocketAddress(address, port);
  }
  // Thread 실행.
  public void run() {
    // 셀렉터 설정
    try (Selector selector = Selector.open()) {
      // 채널 설정
      try (ServerSocketChannel serverChannel = ServerSocketChannel.open()) {
        // non-Blocking 설정
        serverChannel.configureBlocking(false);
        // 서버 ip, port 설정
        serverChannel.socket().bind(listenAddress);
        // 채널에 accept 대기 설정
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        // 셀렉터가 있을 경우.
        while (selector.select() > 0) {
          // 셀렉터 키 셋를 가져온다.
          Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
          // 키가 있으면..
          while (keys.hasNext()) {
            SelectionKey key = keys.next();
            //키 셋에서 제거.
            keys.remove();
            if (!key.isValid()) {
              continue;
            }
            // 접속일 경우..
            if (key.isAcceptable()) {
              this.accept(selector, key);
            // 수신일 경우..
            } else if (key.isReadable()) {
              this.receive(selector, key);
            // 발신일 경우..
            } else if (key.isWritable()) {
              this.send(selector, key);
            }
          }
        }
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
  // 접속시 호출 함수..
  private void accept(Selector selector, SelectionKey key) {
    try {
      // 키 채널을 가져온다.
      ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
      // accept을 해서 Socket 채널을 가져온다.
      SocketChannel channel = serverChannel.accept();
      channel.configureBlocking(false);
      // 소켓 취득
      Socket socket = channel.socket();
      SocketAddress remoteAddr = socket.getRemoteSocketAddress();
      System.out.println("Connected to: " + remoteAddr);
      // 접속 Socket 단위로 사용되는 Buffer;
      StringBuffer sb = new StringBuffer();
      sb.append("Welcome server!\r\n>");
      // Socket 채널을 channel에 송신 등록한다
      channel.register(selector, SelectionKey.OP_WRITE, sb);
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
  // 수신시 호출 함수..
  private void receive(Selector selector, SelectionKey key) {
    try {
      // 키 채널을 가져온다.
      SocketChannel channel = (SocketChannel) key.channel();
      // 채널 Non-blocking 설정
      channel.configureBlocking(false);
      // 소켓 취득
      Socket socket = channel.socket();
      // Byte 버퍼 생성
      ByteBuffer buffer = ByteBuffer.allocate(1024);
      // ***데이터 수신***
      int size = channel.read(buffer);
      // 수신 크기가 없으면 소켓 접속 종료.
      if (size == -1) {
        SocketAddress remoteAddr = socket.getRemoteSocketAddress();
        System.out.println("Connection closed by client: " + remoteAddr);
        // 소켓 채널 닫기
        channel.close();
        // 소켓 닫기
        socket.close();
        // 키 닫기
        key.cancel();
        return;
      }
      // ByteBuffer -> byte[]
      byte[] data = new byte[size];
      System.arraycopy(buffer.array(), 0, data, 0, size);
      // StringBuffer 취득
      StringBuffer sb = (StringBuffer) key.attachment();
      // 버퍼에 수신된 데이터 추가
      sb.append(new String(data));
      // 데이터 끝이 개행 일 경우.
      if (sb.length() > 2 && sb.charAt(sb.length() - 2) == CR && sb.charAt(sb.length() - 1) == LF) {
        // 개행 삭제
        sb.setLength(sb.length() - 2);
        // 메시지를 콘솔에 표시한다.
        String msg = sb.toString();
        System.out.println(msg);
        // exit 경우 접속을 종료한다.
        if ("exit".equals(msg)) {
          // 소켓 채널 닫기
          channel.close();
          // 소켓 닫기
          socket.close();
          // 키 닫기
          key.cancel();
          return;
        }
        // Echo - 메시지> 의 형태로 재 전송.
        sb.insert(0, "Echo - ");
        sb.append("\r\n>");
        // Socket 채널을 channel에 송신 등록한다
        channel.register(selector, SelectionKey.OP_WRITE, sb);
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
  // 발신시 호출 함수
  private void send(Selector selector, SelectionKey key) {
    try {
      // 키 채널을 가져온다.
      SocketChannel channel = (SocketChannel) key.channel();
      // 채널 Non-blocking 설정
      channel.configureBlocking(false);
      // StringBuffer 취득
      StringBuffer sb = (StringBuffer) key.attachment();
      String data = sb.toString();
      // StringBuffer 초기화
      sb.setLength(0);
      // byte 형식으로 변환
      ByteBuffer buffer = ByteBuffer.wrap(data.getBytes());
      // ***데이터 송신***
      channel.write(buffer);
      // Socket 채널을 channel에 수신 등록한다
      channel.register(selector, SelectionKey.OP_READ, sb);
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
  // 시작 함수
  public static void main(String[] args) {
    // 포트는 10000을 Listen한다.
    Executors.newSingleThreadExecutor().execute(new Server("localhost", 10000));
  }
}

위 소스는 Window의 telnet에서 메시지를 주고 받을 수 있게 맞추어서 작성했습니다.


그럼 telnet으로 접속해 보겠습니다.

ip는 로컬(127.0.0.1)로 설정하고 포트는 10000번입니다.

텔넷에서 메시지를 보내니 서버 측에서 메시지를 제대로 받습니다. echo도 제대로 수신됩니다.

종료까지 깔끔하게 통신이 이루어 집니다.


서버는 만들었고 이제는 Client를 만들겠습니다. client라고 해봐야 서버 소스와 거의 비슷합니다.

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.concurrent.Executors;

public class Client implements Runnable {
  private InetSocketAddress connectAddress;
  private static char CR = (char) 0x0D;
  private static char LF = (char) 0x0A;
  private Scanner scanner = new Scanner(System.in);
  // ip와 port 설정
  public Client(String address, int port) {
    connectAddress = new InetSocketAddress(address, port);
  }
  // Thread 실행.
  public void run() {
    // 셀렉터 설정
    try (Selector selector = Selector.open()) {
      // 소켓 접속
      SocketChannel channel = SocketChannel.open(connectAddress);
      // 채널 Non-blocking 설정
      channel.configureBlocking(false);
      // Socket 채널을 channel에 송신 등록한다
      channel.register(selector, SelectionKey.OP_READ, new StringBuffer());
      // 셀렉터가 있을 경우.
      while (selector.select() > 0) {
        // 셀렉터 키 셋를 가져온다.
        Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
        // 키가 있으면..
        while (keys.hasNext()) {
          SelectionKey key = keys.next();
          //키 셋에서 제거.
          keys.remove();
          if (!key.isValid()) {
            continue;
          }
          // 수신일 경우..
          if (key.isReadable()) {
            this.receive(selector, key);
          // 발신일 경우..
          } else if (key.isWritable()) {
            this.send(selector, key);
          }
        }
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
  // 발신시 호출 함수
  private void receive(Selector selector, SelectionKey key) {
    try {
      // 키 채널을 가져온다.
      SocketChannel channel = (SocketChannel) key.channel();
      // 채널 Non-blocking 설정
      channel.configureBlocking(false);
      // 소켓 취득
      Socket socket = channel.socket();
      // Byte 버퍼 생성
      ByteBuffer buffer = ByteBuffer.allocate(1024);
      // ***데이터 수신***
      int size = channel.read(buffer);
      // ***데이터 수신***
      if (size == -1) {
        SocketAddress remoteAddr = socket.getRemoteSocketAddress();
        System.out.println("Connection closed by client: " + remoteAddr);
        // 소켓 채널 닫기
        channel.close();
        // 소켓 닫기
        socket.close();
        // 키 닫기
        key.cancel();
        return;
      }
      // ByteBuffer -> byte[]
      byte[] data = new byte[size];
      System.arraycopy(buffer.array(), 0, data, 0, size);
      // StringBuffer 취득
      StringBuffer sb = (StringBuffer) key.attachment();
      // 버퍼에 수신된 데이터 추가
      sb.append(new String(data));
      // 데이터 끝이 개행 일 경우. + >
      if (sb.length() > 3 && sb.charAt(sb.length() - 3) == CR && sb.charAt(sb.length() - 2) == LF && sb.charAt(sb.length() - 1) == '>') {
        // 메시지를 콘솔에 표시
        String msg = sb.toString();
        System.out.print(msg);
        // StringBuffer 초기화
        sb.setLength(0);
        // Socket 채널을 channel에 송신 등록한다
        channel.register(selector, SelectionKey.OP_WRITE, sb);
      }
    } catch (IOException e) {
      e.printStackTrace();
    }

  }
  // 발신시 호출 함수
  private void send(Selector selector, SelectionKey key) {
    try {
      // 키 채널을 가져온다.
      SocketChannel channel = (SocketChannel) key.channel();
      // 채널 Non-blocking 설정
      channel.configureBlocking(false);
      // 콘솔에서 값을 입력 받는다.
      String msg = scanner.next() + "\r\n";
      ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
      // ***데이터 송신***
      channel.write(buffer);
      // Socket 채널을 channel에 수신 등록한다
      channel.register(selector, SelectionKey.OP_READ, key.attachment());
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
  // 시작 함수
  public static void main(String[] args) {
    // 포트는 10000을 Listen한다.
    Executors.newSingleThreadExecutor().execute(new Server("localhost", 10000));
    // 클라이언트 쓰레드
    Executors.newSingleThreadExecutor().execute(new Client("127.0.0.1", 10000));
  }
}

제가 main에 Server를 실행하고 Client를 실행했습니다. 즉, 두개의 쓰래드로 Server와 Client가 돌고 있는 것입니다.

콘솔에서 제가 값을 입력하면 Server에서 echo를 만들어서 클라이언트로 제대로 보내주고 있습니다. 마지막의 exit까지 깔끔하게 종료가 됩니다.

참조 - https://examples.javacodegeeks.com/core-java/nio/java-nio-socket-example/


여기까지 Java에서 NIO Socket 통신에 대한 설명이었습니다.


궁금한 점이나 잘못된 점이 있으면 댓글 부탁드립니다.