728x90

마지막으로 Java NIO non-blocking으로 만든 서버에서도 문제가 있었다.

accept 할 때마다 조건문으로 확인을 하고 Thread.sleep을 걸어주면, 그만큼 리소스의 낭비가 생기기 때문이다.

그리고 sleep 중간에 요청이 들어오면, sleep이 끝날 때까지 대기를 하기 때문에 그만큼 딜레이가 생긴다.

 

이런 이벤트들을 계속 확인하지 않고, 여러 이벤트를 추적할 수 있도록 변경해보자.

이번 내용은 네트워크 프로그래밍에서 배웠던 내용을 C에서 Java로 변경한 느낌일 것이다.

 

 

Selector 이론

SelectableChannel

 

selectableChannel에는 register 함수를 제공한다.

register는 Selector에 channel을 등록할 수 있다.

public abstract class SelectableChannel
    extends AbstractInterruptibleChannel
    implements Channel
{

    public abstract SelectableChannel configureBlocking(boolean block)
        throws IOException;

    public final SelectionKey register(Selector sel, int ops)
        throws ClosedChannelException
    {
        return register(sel, ops, null);
    }
}

 

 

Selector

 

여러 Channel의 이벤트를 등록하고 준비된 이벤트를 모아서 조회할 수 있도록 한다.

public abstract class Selector implements Closable{
    public abstract int select() throws IOException;
    public abstract Set<SelectionKey> selectedKEys();
}

 

 

Selector를 만들고, channel에 selector와 관심 있는 이벤트를 등록한다.

channel의 register를 이용하면 내부 함수에서 다시 selector의 register를 호출하기 때문에, channel에 등록하더라도 selector에서 이벤트를 받을 수 있다.

관심있는 이벤트가 ops인데, 무슨 이벤트인지에 대한 정보이다.

이벤트들은 다음과 같다.

  • OP_READ: Channel에 읽기 준비가 완료되었다.
  • OP_WRITE: Channel에 쓸 준비가 완료되었다.
  • OP_ACCEPT: serverSocketChannel에서 accept 할 준비가 완료되었다.
  • OP_CONNECT: socketChannel에서 connect 할 준비가 완료되었다.

 

이렇게 해서 Accept를 등록하려는 코드는 다음과 같다.

serverChannel.register(selector, SelectionKey.OP_ACCEPT);

 

 

등록이 끝나면 이제 Accept를 대기할 텐데, select를 사용하면 준비가 될 때까지 쓰레드 blocking이 된다.

 

while(true){
    selector.select();
    
    var selectedKEys = selector.selectedKeys().iterator();
    
    while(selectedKeys.hasNext()){
    	var key = selectedKeys.next();
        /////
        selectedKeys.remove();
    }
}

 

select에서 준비가 완료된 작업들이 발견된다면, 다음 line으로 이동한다.

준비가 완료된 작업을 가져올 때는 selectedKeys를 사용한다.

 

여기서 SelectionKey를 iterator로 가져오게 되는데, SelectionKey는 다음과 같다.

public abstract class SelectionKey {

    public abstract SelectableChannel channel();

    public abstract Selector selector();

    public abstract int interestOps();

    public final boolean isReadable() {
        return (readyOps() & OP_READ) != 0;
    }

    public final boolean isWritable() {
        return (readyOps() & OP_WRITE) != 0;
    }

    public final boolean isConnectable() {
        return (readyOps() & OP_CONNECT) != 0;
    }

    public final boolean isAcceptable() {
        return (readyOps() & OP_ACCEPT) != 0;
    }

}

 

 

SelectionKey는 channel과 selector, 발생한 이벤트의 정보를 가지고 있는 객체이다.

여기에 모든 정보가 있기 때문에, 이거만 있으면 추가적인 정보를 넘길 필요가 없을 것이다.

 

이렇게 accept를 알아보았고, read인 경우에는 해당 이벤트를 받아 동일하게 처리하면 될 것이다.

 

 

Selector 실습

이제 Selector를 사용하여 더 최적화를 해보도록 하자.

저번에 작성했던 NIO 서버를 변경할 것이다.

@Slf4j
public class SelectorServer {

    private static ExecutorService executorService = Executors.newFixedThreadPool(50);

    @SneakyThrows
    public static void main(String[] args) {
        log.info("start");
        try(ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            Selector selector = Selector.open()){

            serverSocketChannel.bind(new InetSocketAddress("localhost", 8080));
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            while(true){

                selector.select();

                Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();

                while(selectedKeys.hasNext()){
                    SelectionKey key = selectedKeys.next();
                    selectedKeys.remove();

                    if(key.isAcceptable()){
                        SocketChannel clientSocket = ((ServerSocketChannel)key.channel()).accept();
                        clientSocket.configureBlocking(false);
                        clientSocket.register(selector, SelectionKey.OP_READ);
                    }else if(key.isReadable()){
                        SocketChannel clientSocket = (SocketChannel) key.channel();

                        String responseBody = handleRequest(clientSocket);
                        sendResponse(clientSocket, responseBody);
                    }
                }

            }
        }
    }

    @SneakyThrows
    private static String handleRequest(SocketChannel clientSocket) {
        ByteBuffer requestByteBuffer = ByteBuffer.allocateDirect(1024);
        clientSocket.read(requestByteBuffer);

        requestByteBuffer.flip();
        String requestBody = StandardCharsets.UTF_8.decode(requestByteBuffer).toString();
        log.info("request: {}", requestBody);

        return requestBody;
    }

    @SneakyThrows
    public static void sendResponse(SocketChannel clientSocket, String requestBody){
        CompletableFuture.runAsync(() -> {

            try {
                Thread.sleep(10);
                String content = "received: " + requestBody;
                ByteBuffer responseByteBuffer = ByteBuffer.wrap(content.getBytes());
                clientSocket.write(responseByteBuffer);
                clientSocket.close();
            } catch (Exception e) {
            }
        }, executorService);

    }
}

 

여기서 Accept 이벤트와 Read 이벤트를 처리하고 있는 것을 볼 수 있다.

각각의 이벤트 키로 해당 이벤트를 넣어 처리를 하며, 다른 쓰레드에서 비동기적으로 마지막 메서드를 실행하는 것을 볼 수 있다.

 

 

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

Java AIO  (0) 2024.03.19
Java IO Server 서버를 NIO로 변경  (0) 2024.03.19
Java NIO  (0) 2024.03.18
JAVA IO  (0) 2024.03.18
Mutiny  (0) 2024.03.14
728x90

지금까지 배운 것을 바탕으로 서버를 만들고, 차례로 성능을 높여보자.

 

 

우선 간단한 기본 서버이다.

 

@Slf4j
public class JavaIOServer {

    @SneakyThrows
    public static void main(String[] args) {
        log.info("start");

        try(ServerSocket serverSocket = new ServerSocket()){
            serverSocket.bind(new InetSocketAddress("localhost", 8080));

            while(true){

                Socket clientSocket = serverSocket.accept();

                byte[] requestBytes = new byte[1024];
                InputStream inputStream = clientSocket.getInputStream();
                inputStream.read(requestBytes);

                log.info("request: {}", new String(requestBytes).trim());

                OutputStream outputStream = clientSocket.getOutputStream();
                String response = "I am Server";
                outputStream.write(response.getBytes());
                outputStream.flush();
            }

        }
    }
}

 

간단한 서버이다.

while(true)를 돌며 계속 응답을 받고, 받은 후에 클라이언트에게 I am Server라는 값을 준다.

 

클라이언트이다.

요청을 보내고, 응답을 받은 후 종료된다.

@Slf4j
public class JavaIOClient {

    public static void main(String[] args) {
        try(Socket socket = new Socket()){
            socket.connect(new InetSocketAddress("localhost", 8080));

            var outputStream = socket.getOutputStream();
            String requestBody = "I am Seungkyu client";
            outputStream.write(requestBody.getBytes());
            outputStream.flush();

            InputStream inputStream = socket.getInputStream();
            byte[] responseBytes = new byte[1024];
            inputStream.read(responseBytes);

            log.info("result: {}", new String(responseBytes).trim());


        }catch (IOException e){
            throw new RuntimeException(e);
        }
    }
}

이런 식으로 서버에 계속 요청이 오는 것을 볼 수 있다.

 

이제 서버를 NIO로 변경해보자.

@Slf4j
public class JavaNIOBlockingServer {

    @SneakyThrows
    public static void main(String[] args) {
        log.info("start");

        try(ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {

            serverSocketChannel.bind(new InetSocketAddress("localhost", 8080));

            while(true){

                SocketChannel socketChannel = serverSocketChannel.accept();

                ByteBuffer requestByteBuffer = ByteBuffer.allocateDirect(1024);
                socketChannel.read(requestByteBuffer);
                requestByteBuffer.flip();
                String requestBody = StandardCharsets.UTF_8.decode(requestByteBuffer).toString();
                log.info("request: {}", requestBody);

                ByteBuffer responseByteBuffer = ByteBuffer.wrap("I am Server".getBytes());
                socketChannel.write(responseByteBuffer);
                socketChannel.close();
            }

        }
    }
}

 

Channel을 이용하여 데이터를 읽고 쓰며, 버퍼를 사용하는 것을 볼 수 있다.

이렇게 속도를 높이겠지만, 아직도 accept에서 Blocking이 발생하고 있다.

 

그렇기에 이거를 Non-Blocking으로 바꿔보자

@Slf4j
public class JavaNIONonBlockingServer {

    @SneakyThrows
    public static void main(String[] args) {
        log.info("start");

        try(ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {

            serverSocketChannel.bind(new InetSocketAddress("localhost", 8080));
            serverSocketChannel.configureBlocking(false);

            while(true){

                SocketChannel socketChannel = serverSocketChannel.accept();

                if(socketChannel == null){
                    Thread.sleep(100);
                    continue;
                }

                ByteBuffer requestByteBuffer = ByteBuffer.allocateDirect(1024);
                while(socketChannel.read(requestByteBuffer) == 0){
                    log.info("reading");
                }
                socketChannel.read(requestByteBuffer);
                requestByteBuffer.flip();
                String requestBody = StandardCharsets.UTF_8.decode(requestByteBuffer).toString();
                log.info("request: {}", requestBody);

                ByteBuffer responseByteBuffer = ByteBuffer.wrap("I am Server".getBytes());
                socketChannel.write(responseByteBuffer);
                socketChannel.close();
            }
        }
    }
}

 

configureBlocking에 false를 주어 Non-Blocking으로 동작하게 했다.

하지만 socketChannel이 null 일 때 Thread.sleep을 하기에 좋은 방법은 아닌 것 같다.

또한 requestBuffer에도 데이터가 있는지를 계속 체크 해주어야 하기 때문에, 이 부분도 수정이 필요해보인다.

 

이거를 1000개의 요청으로 얼마나 걸리는지 테스트를 해보도록 하자.

테스트 용도의 코드는 다음과 같다.

@Slf4j
public class JavaIOMultiClient {
    private static ExecutorService executorService = Executors.newFixedThreadPool(50);

    @SneakyThrows
    public static void main(String[] args) {
        log.info("start main");

        List<CompletableFuture<Void>> futures = new ArrayList<>();
        long start = System.currentTimeMillis();

        for (int i = 0; i < 1000; i++) {
            var future = CompletableFuture.runAsync(() -> {
                try (Socket socket = new Socket()) {
                    socket.connect(new InetSocketAddress("localhost", 8080));

                    OutputStream out = socket.getOutputStream();
                    String requestBody = "This is client";
                    out.write(requestBody.getBytes());
                    out.flush();

                    InputStream in = socket.getInputStream();
                    byte[] responseBytes = new byte[1024];
                    in.read(responseBytes);
                    log.info("result: {}", new String(responseBytes).trim());
                } catch (Exception e) {}
            }, executorService);

            futures.add(future);
        }

        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
        executorService.shutdown();
        log.info("end main");
        long end = System.currentTimeMillis();
        log.info("duration: {}", (end - start) / 1000.0);
    }
}

 

50개의 쓰레드로 1000개의 요청을 한다.

 

이렇게 실행해서 시간을 측정해보니, 약 12초가 나왔다.

 

여기서 코드를 수정해서 속도를 높일 수는 없을까?

 

서버에서 처리하는 코드를 CompletableFuture를 이용해 별개의 쓰레드에서 처리하도록 했다.

@Slf4j
public class JavaNIONonBlockingServer {
    @SneakyThrows
    public static void main(String[] args) {
        log.info("start main");
        try (ServerSocketChannel serverSocket = ServerSocketChannel.open()) {
            serverSocket.bind(new InetSocketAddress("localhost", 8080));
            serverSocket.configureBlocking(false);

            while (true) {
                SocketChannel clientSocket = serverSocket.accept();
                if (clientSocket == null) {
                    Thread.sleep(100);
                    continue;
                }

                CompletableFuture.runAsync(() -> {
                    handleRequest(clientSocket);
                });
            }
        }
    }

    @SneakyThrows
    private static void handleRequest(SocketChannel clientSocket) {
        ByteBuffer requestByteBuffer = ByteBuffer.allocateDirect(1024);
        while (clientSocket.read(requestByteBuffer) == 0) {
            log.info("Reading...");
        }
        requestByteBuffer.flip();
        String requestBody = StandardCharsets.UTF_8.decode(requestByteBuffer).toString();
        log.info("request: {}", requestBody);

        Thread.sleep(10);

        ByteBuffer responeByteBuffer = ByteBuffer.wrap("This is server".getBytes());
        clientSocket.write(responeByteBuffer);
        clientSocket.close();
    }
}

 

이렇게 서버를 실행해보니 시간이 많이 준 것을 볼 수 있었다.

 

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

Java Selector  (1) 2024.03.20
Java AIO  (0) 2024.03.19
Java NIO  (0) 2024.03.18
JAVA IO  (0) 2024.03.18
Mutiny  (0) 2024.03.14
728x90

Java에서 NIO에 대해 알아보자.

 

java New Input/Output의 줄인말이며, 파일과 네트워크에 데이터를 읽고 쓸 수 있는 API를 제공한다.

buffer기반이며, non-blocking을 지원한다.

 

java IO는 InputStream, OutputStream을 사용해서 방향이 정해져있었다면, NIO는 Channel을 이용해 읽고 쓰기가 한번에 가능하다.

또한 NIO는 아예 시작부터 buffer의 단위로 데이터를 주고 받는다.

 

Channel, Buffer

데이터를 읽을 때는 Channel read()를 사용하여 버퍼에 데이터를 저장한다.

데이터를 쓸 때는 Channel write()를 사용하여 버퍼에 데이터를 쓴다.

 

이 버퍼에서도 ByteBuffer부터 CharBuffer까지 다양한 타입의 버퍼가 존재한다.

 

Buffer의 위치

버퍼의 현재 커서를 나타내는 다양한 속성이 있다.

capacity: Buffer의 크기이다. Buffer 생성시에 결정된다.

position: Buffer의 현재 위치이다. 버퍼에서 데이터를 읽거나 쓸 때 시작하는 위치이며, 1Byte가 추가될 때마다 1씩 증가한다.

limit: Buffer에서 데이터를 읽거나 쓸 수 있는 마지막 위치이다. limit 이후로 데이터를 읽거나 쓰기 불가능하다.

mark: 현재 position의 위츠를 지정한다.

 

Buffer의 위치 메서드

flip: Buffer의 limit 위치를 현재 position으로 이동시키고, position을 0으로 리셋한다.

rewind: Buffer의 position 위치를 0으로 리셋. limit은 유지한다.

clear: Buffer의 limit 위치를 capacity 위치로 이동시키고, position을 0으로 리셋한다.

 

Buffer

Java IO는 JVM의 버퍼에만 접근이 가능하고, 시스템 자체에는 접근이 불가능했다.

하지만 Java NIO는 시스템 버퍼에도 접근이 가능하다.

NIO의 버퍼들이다.

DirectByteBuffer HeapByteBuffer
시스템 자체 메모리에 저장 JVM Heap 메모리에 저장
자체 메모리에서 JVM으로 복사를 하지 않기 때문에 속도가 빠르다. JVM으로 복사가 일어나므로 데이터를 읽고 쓰는 속도가 느리다.
allocate, deallocate가 느리다. gc에서 관리하기에 allocate, deallocate가 빠르다.

 

DirectByteBuffer는 allocateDirect() 함수로 생성이 가능하고, HeapByteBuffer는 allocate(), wrap() 함수로 생성 가능하다.

isDirect인지 DirectBuffer인지 아닌지 확인 가능하다.

 

FileChannel - Read

@Slf4j
public class FileChannelReadExample {

    public static void main(String[] args) throws IOException {
        log.info("start");

        var file = new File("path");

        try(var fileChannel = FileChannel.open(file.toPath())){
            var byteBuffer = ByteBuffer.allocateDirect(1024);
            fileChannel.read(byteBuffer);
            byteBuffer.flip();

            var result = StandardCharsets.UTF_8.decode(byteBuffer);
            log.info("result: {}", result);
        }
        log.info("end");
    }
}

 

allocateDirect로 커널을 사용하여 파일을 읽어온다.

NIO를 사용하기 때문에 FileChannel에서 읽고 쓰기가 가능하다.

 

아래는 ByteBuffer.wrap()으로 JVM버퍼를 사용하는 예제이다.

@Slf4j
public class FileChannelWriteExample {

    public static void main(String[] args) throws IOException {
        var file = new File("path");

        var mode = StandardOpenOption.WRITE;
        try(var fileChannel = FileChannel.open(file.toPath(), mode)){
            var buffer = ByteBuffer.wrap("Hi Seungkyu~~".getBytes());
            var result = fileChannel.write(buffer);
            log.info("result: {}", result);
        }
    }
}

 

SocketChannel

이제 배운것들로 소켓 서버와 클라이언트를 만들어보자.

 

우선 서버이다.

@Slf4j
public class ServerSocketChannelExample {

    public static void main(String[] args) throws IOException {

        log.info("start");

        try(var serverChannel = ServerSocketChannel.open()){
            var address = new InetSocketAddress("localhost", 8080);
            serverChannel.bind(address);

            try(var clientSocket = serverChannel.accept()){
                ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
                clientSocket.read(byteBuffer);
                byteBuffer.flip();

                var request = new String(byteBuffer.array()).trim();
                log.info("request: {}", request);

                var response = "I am Server";
                var responseBuffer = ByteBuffer.wrap(response.getBytes());
                clientSocket.write(responseBuffer);
                responseBuffer.flip();
            }

        }

        log.info("end");
    }
}

 

Direct 버퍼를 만들어서 읽어온다.

Socket에서도 read와 flip을 사용해서 읽어노는 것을 볼 수 있다.

 

다음은 클라이언트이다.

@Slf4j
public class ClientSocketChannelExample {

    public static void main(String[] args) throws IOException {
        log.info("start");

        try(var socketChannel = SocketChannel.open()){
            var address = new InetSocketAddress("localhost", 8080);
            var connected = socketChannel.connect(address);
            log.info("connected: {}", connected);

            String request = "I am Client";
            ByteBuffer requestBuffer = ByteBuffer.wrap(request.getBytes());
            socketChannel.write(requestBuffer);
            requestBuffer.clear();
            
            ByteBuffer responseBuffer = ByteBuffer.allocateDirect(1024);
            while (socketChannel.read(responseBuffer) > 0){
                responseBuffer.flip();
                log.info("response: {}", StandardCharsets.UTF_8.decode(responseBuffer));
                responseBuffer.clear();
            }
        }
        log.info("end");
    }
}

 

 

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

Java AIO  (0) 2024.03.19
Java IO Server 서버를 NIO로 변경  (0) 2024.03.19
JAVA IO  (0) 2024.03.18
Mutiny  (0) 2024.03.14
RxJava Reactor  (2) 2024.03.14

+ Recent posts