728x90

마지막으로 Java NIO non-blocking으로 만든 서버에서도 문제가 있었다.

accept 할 때마다 조건문으로 확인을 하고 Thread.sleep을 걸어주면, 그만큼 리소스의 낭비가 생기기 때문이다.

그리고 sleep 중간에 요청이 들어오면, sleep이 끝날 때까지 대기를 하기 때문에 그만큼 딜레이가 생긴다.

 

이런 이벤트들을 계속 확인하지 않고, 여러 이벤트를 추적할 수 있도록 변경해보자.

이번 내용은 네트워크 프로그래밍에서 배웠던 내용을 C에서 Java로 변경한 느낌일 것이다.

 

 

Selector 이론

SelectableChannel

 

selectableChannel에는 register 함수를 제공한다.

register는 Selector에 channel을 등록할 수 있다.

public abstract class SelectableChannel
    extends AbstractInterruptibleChannel
    implements Channel
{

    public abstract SelectableChannel configureBlocking(boolean block)
        throws IOException;

    public final SelectionKey register(Selector sel, int ops)
        throws ClosedChannelException
    {
        return register(sel, ops, null);
    }
}

 

 

Selector

 

여러 Channel의 이벤트를 등록하고 준비된 이벤트를 모아서 조회할 수 있도록 한다.

public abstract class Selector implements Closable{
    public abstract int select() throws IOException;
    public abstract Set<SelectionKey> selectedKEys();
}

 

 

Selector를 만들고, channel에 selector와 관심 있는 이벤트를 등록한다.

channel의 register를 이용하면 내부 함수에서 다시 selector의 register를 호출하기 때문에, channel에 등록하더라도 selector에서 이벤트를 받을 수 있다.

관심있는 이벤트가 ops인데, 무슨 이벤트인지에 대한 정보이다.

이벤트들은 다음과 같다.

  • OP_READ: Channel에 읽기 준비가 완료되었다.
  • OP_WRITE: Channel에 쓸 준비가 완료되었다.
  • OP_ACCEPT: serverSocketChannel에서 accept 할 준비가 완료되었다.
  • OP_CONNECT: socketChannel에서 connect 할 준비가 완료되었다.

 

이렇게 해서 Accept를 등록하려는 코드는 다음과 같다.

serverChannel.register(selector, SelectionKey.OP_ACCEPT);

 

 

등록이 끝나면 이제 Accept를 대기할 텐데, select를 사용하면 준비가 될 때까지 쓰레드 blocking이 된다.

 

while(true){
    selector.select();
    
    var selectedKEys = selector.selectedKeys().iterator();
    
    while(selectedKeys.hasNext()){
    	var key = selectedKeys.next();
        /////
        selectedKeys.remove();
    }
}

 

select에서 준비가 완료된 작업들이 발견된다면, 다음 line으로 이동한다.

준비가 완료된 작업을 가져올 때는 selectedKeys를 사용한다.

 

여기서 SelectionKey를 iterator로 가져오게 되는데, SelectionKey는 다음과 같다.

public abstract class SelectionKey {

    public abstract SelectableChannel channel();

    public abstract Selector selector();

    public abstract int interestOps();

    public final boolean isReadable() {
        return (readyOps() & OP_READ) != 0;
    }

    public final boolean isWritable() {
        return (readyOps() & OP_WRITE) != 0;
    }

    public final boolean isConnectable() {
        return (readyOps() & OP_CONNECT) != 0;
    }

    public final boolean isAcceptable() {
        return (readyOps() & OP_ACCEPT) != 0;
    }

}

 

 

SelectionKey는 channel과 selector, 발생한 이벤트의 정보를 가지고 있는 객체이다.

여기에 모든 정보가 있기 때문에, 이거만 있으면 추가적인 정보를 넘길 필요가 없을 것이다.

 

이렇게 accept를 알아보았고, read인 경우에는 해당 이벤트를 받아 동일하게 처리하면 될 것이다.

 

 

Selector 실습

이제 Selector를 사용하여 더 최적화를 해보도록 하자.

저번에 작성했던 NIO 서버를 변경할 것이다.

@Slf4j
public class SelectorServer {

    private static ExecutorService executorService = Executors.newFixedThreadPool(50);

    @SneakyThrows
    public static void main(String[] args) {
        log.info("start");
        try(ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            Selector selector = Selector.open()){

            serverSocketChannel.bind(new InetSocketAddress("localhost", 8080));
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            while(true){

                selector.select();

                Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();

                while(selectedKeys.hasNext()){
                    SelectionKey key = selectedKeys.next();
                    selectedKeys.remove();

                    if(key.isAcceptable()){
                        SocketChannel clientSocket = ((ServerSocketChannel)key.channel()).accept();
                        clientSocket.configureBlocking(false);
                        clientSocket.register(selector, SelectionKey.OP_READ);
                    }else if(key.isReadable()){
                        SocketChannel clientSocket = (SocketChannel) key.channel();

                        String responseBody = handleRequest(clientSocket);
                        sendResponse(clientSocket, responseBody);
                    }
                }

            }
        }
    }

    @SneakyThrows
    private static String handleRequest(SocketChannel clientSocket) {
        ByteBuffer requestByteBuffer = ByteBuffer.allocateDirect(1024);
        clientSocket.read(requestByteBuffer);

        requestByteBuffer.flip();
        String requestBody = StandardCharsets.UTF_8.decode(requestByteBuffer).toString();
        log.info("request: {}", requestBody);

        return requestBody;
    }

    @SneakyThrows
    public static void sendResponse(SocketChannel clientSocket, String requestBody){
        CompletableFuture.runAsync(() -> {

            try {
                Thread.sleep(10);
                String content = "received: " + requestBody;
                ByteBuffer responseByteBuffer = ByteBuffer.wrap(content.getBytes());
                clientSocket.write(responseByteBuffer);
                clientSocket.close();
            } catch (Exception e) {
            }
        }, executorService);

    }
}

 

여기서 Accept 이벤트와 Read 이벤트를 처리하고 있는 것을 볼 수 있다.

각각의 이벤트 키로 해당 이벤트를 넣어 처리를 하며, 다른 쓰레드에서 비동기적으로 마지막 메서드를 실행하는 것을 볼 수 있다.

 

 

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

Java AIO  (0) 2024.03.19
Java IO Server 서버를 NIO로 변경  (0) 2024.03.19
Java NIO  (0) 2024.03.18
JAVA IO  (0) 2024.03.18
Mutiny  (0) 2024.03.14

+ Recent posts