728x90

마지막으로 Java NIO non-blocking으로 만든 서버에서도 문제가 있었다.

accept 할 때마다 조건문으로 확인을 하고 Thread.sleep을 걸어주면, 그만큼 리소스의 낭비가 생기기 때문이다.

그리고 sleep 중간에 요청이 들어오면, sleep이 끝날 때까지 대기를 하기 때문에 그만큼 딜레이가 생긴다.

 

이런 이벤트들을 계속 확인하지 않고, 여러 이벤트를 추적할 수 있도록 변경해보자.

이번 내용은 네트워크 프로그래밍에서 배웠던 내용을 C에서 Java로 변경한 느낌일 것이다.

 

 

Selector 이론

SelectableChannel

 

selectableChannel에는 register 함수를 제공한다.

register는 Selector에 channel을 등록할 수 있다.

public abstract class SelectableChannel
    extends AbstractInterruptibleChannel
    implements Channel
{

    public abstract SelectableChannel configureBlocking(boolean block)
        throws IOException;

    public final SelectionKey register(Selector sel, int ops)
        throws ClosedChannelException
    {
        return register(sel, ops, null);
    }
}

 

 

Selector

 

여러 Channel의 이벤트를 등록하고 준비된 이벤트를 모아서 조회할 수 있도록 한다.

public abstract class Selector implements Closable{
    public abstract int select() throws IOException;
    public abstract Set<SelectionKey> selectedKEys();
}

 

 

Selector를 만들고, channel에 selector와 관심 있는 이벤트를 등록한다.

channel의 register를 이용하면 내부 함수에서 다시 selector의 register를 호출하기 때문에, channel에 등록하더라도 selector에서 이벤트를 받을 수 있다.

관심있는 이벤트가 ops인데, 무슨 이벤트인지에 대한 정보이다.

이벤트들은 다음과 같다.

  • OP_READ: Channel에 읽기 준비가 완료되었다.
  • OP_WRITE: Channel에 쓸 준비가 완료되었다.
  • OP_ACCEPT: serverSocketChannel에서 accept 할 준비가 완료되었다.
  • OP_CONNECT: socketChannel에서 connect 할 준비가 완료되었다.

 

이렇게 해서 Accept를 등록하려는 코드는 다음과 같다.

serverChannel.register(selector, SelectionKey.OP_ACCEPT);

 

 

등록이 끝나면 이제 Accept를 대기할 텐데, select를 사용하면 준비가 될 때까지 쓰레드 blocking이 된다.

 

while(true){
    selector.select();
    
    var selectedKEys = selector.selectedKeys().iterator();
    
    while(selectedKeys.hasNext()){
    	var key = selectedKeys.next();
        /////
        selectedKeys.remove();
    }
}

 

select에서 준비가 완료된 작업들이 발견된다면, 다음 line으로 이동한다.

준비가 완료된 작업을 가져올 때는 selectedKeys를 사용한다.

 

여기서 SelectionKey를 iterator로 가져오게 되는데, SelectionKey는 다음과 같다.

public abstract class SelectionKey {

    public abstract SelectableChannel channel();

    public abstract Selector selector();

    public abstract int interestOps();

    public final boolean isReadable() {
        return (readyOps() & OP_READ) != 0;
    }

    public final boolean isWritable() {
        return (readyOps() & OP_WRITE) != 0;
    }

    public final boolean isConnectable() {
        return (readyOps() & OP_CONNECT) != 0;
    }

    public final boolean isAcceptable() {
        return (readyOps() & OP_ACCEPT) != 0;
    }

}

 

 

SelectionKey는 channel과 selector, 발생한 이벤트의 정보를 가지고 있는 객체이다.

여기에 모든 정보가 있기 때문에, 이거만 있으면 추가적인 정보를 넘길 필요가 없을 것이다.

 

이렇게 accept를 알아보았고, read인 경우에는 해당 이벤트를 받아 동일하게 처리하면 될 것이다.

 

 

Selector 실습

이제 Selector를 사용하여 더 최적화를 해보도록 하자.

저번에 작성했던 NIO 서버를 변경할 것이다.

@Slf4j
public class SelectorServer {

    private static ExecutorService executorService = Executors.newFixedThreadPool(50);

    @SneakyThrows
    public static void main(String[] args) {
        log.info("start");
        try(ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            Selector selector = Selector.open()){

            serverSocketChannel.bind(new InetSocketAddress("localhost", 8080));
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            while(true){

                selector.select();

                Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();

                while(selectedKeys.hasNext()){
                    SelectionKey key = selectedKeys.next();
                    selectedKeys.remove();

                    if(key.isAcceptable()){
                        SocketChannel clientSocket = ((ServerSocketChannel)key.channel()).accept();
                        clientSocket.configureBlocking(false);
                        clientSocket.register(selector, SelectionKey.OP_READ);
                    }else if(key.isReadable()){
                        SocketChannel clientSocket = (SocketChannel) key.channel();

                        String responseBody = handleRequest(clientSocket);
                        sendResponse(clientSocket, responseBody);
                    }
                }

            }
        }
    }

    @SneakyThrows
    private static String handleRequest(SocketChannel clientSocket) {
        ByteBuffer requestByteBuffer = ByteBuffer.allocateDirect(1024);
        clientSocket.read(requestByteBuffer);

        requestByteBuffer.flip();
        String requestBody = StandardCharsets.UTF_8.decode(requestByteBuffer).toString();
        log.info("request: {}", requestBody);

        return requestBody;
    }

    @SneakyThrows
    public static void sendResponse(SocketChannel clientSocket, String requestBody){
        CompletableFuture.runAsync(() -> {

            try {
                Thread.sleep(10);
                String content = "received: " + requestBody;
                ByteBuffer responseByteBuffer = ByteBuffer.wrap(content.getBytes());
                clientSocket.write(responseByteBuffer);
                clientSocket.close();
            } catch (Exception e) {
            }
        }, executorService);

    }
}

 

여기서 Accept 이벤트와 Read 이벤트를 처리하고 있는 것을 볼 수 있다.

각각의 이벤트 키로 해당 이벤트를 넣어 처리를 하며, 다른 쓰레드에서 비동기적으로 마지막 메서드를 실행하는 것을 볼 수 있다.

 

 

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

Java AIO  (0) 2024.03.19
Java IO Server 서버를 NIO로 변경  (0) 2024.03.19
Java NIO  (0) 2024.03.18
JAVA IO  (0) 2024.03.18
Mutiny  (0) 2024.03.14
728x90

AsynchronousChannel을 지원한다고 한다.

callback 함수를 지정할 수 있고, future로 반환을 지원한다고 한다.

callback과 future를 사용하면 비동기적인 로직이 가능할 것 같다.

 

  • File - callback

우선 callback 함수를 이용해서 파일을 읽어보자.

@Slf4j
public class AsyncFileChannelReadCallbackExample {

    @SneakyThrows
    public static void main(String[] args) {
        log.info("start");
        var file = new File("path");
        var channel = AsynchronousFileChannel.open(file.toPath());
        ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);

        channel.read(byteBuffer, 0, null, new CompletionHandler<Integer, Object>() {
            @SneakyThrows
            @Override
            public void completed(Integer result, Object attachment) {
                byteBuffer.flip();
                var resultString = StandardCharsets.UTF_8.decode(byteBuffer);
                log.info("result: {}", resultString);
                channel.close();
            }

            @Override
            public void failed(Throwable exc, Object attachment) {

            }
        });

        while(channel.isOpen()){
            log.info("Reading...");
        }
        log.info("end");
    }
}

 

completed 함수로 callback을 지정해주어, 파일이 모두 읽히면 result: {}로 내용을 출력할 수 있도록 하였다.

파일이 열린동안 메인 쓰레드가 닫히지 않도록 channel.isOpen으로 대기를 걸어주었다.

 

이렇게 callback으로 파일의 내용이 출력된 것을 볼 수 있다.

 

  • File - future
@Slf4j
public class AsyncFileChannelReadFutureExample {
    
    @SneakyThrows
    public static void main(String[] args) {
        log.info("start");
        
        var file = new File("path");
        
        try(var channel = AsynchronousFileChannel.open(file.toPath())){
            var buffer = ByteBuffer.allocateDirect(1024);
            Future<Integer> channelRead = channel.read(buffer, 0);
            while(!channelRead.isDone()){
                log.info("Reading...");
            }
            
            buffer.flip();
            var result = StandardCharsets.UTF_8.decode(buffer);
            log.info("result: {}", result);
        }
        
        log.info("end");
    }
}

 

이제 Future로 받고, 해당 Future가 끝나기를 기다린 후 파일의 내용을 출력하게 되었다.

코드는 깔끔해졌지만, while에서 동기적으로 동작하게 된다.

 

  • socket - callback

이번엔 소켓이다.

accept하는 부분을 비동기적으로 변경하였다.

@Slf4j
public class AsyncServerSocketCallbackExample {

    @SneakyThrows
    public static void main(String[] args) {
        log.info("start");

        var serverSocketChannel = AsynchronousServerSocketChannel.open();
        var address = new InetSocketAddress("localhost", 8080);
        serverSocketChannel.bind(address);

        serverSocketChannel.accept(null, new CompletionHandler<>() {

            @Override
            public void completed(AsynchronousSocketChannel clientSocket, Object attachment) {
                log.info("accepted");
                var requestBuffer = ByteBuffer.allocateDirect(1024);

                clientSocket.read(requestBuffer, null, new CompletionHandler<>() {
                    @SneakyThrows
                    @Override
                    public void completed(Integer result, Object attachment) {
                        requestBuffer.flip();
                        var request = StandardCharsets.UTF_8.decode(requestBuffer);
                        log.info("request: {}", request);

                        var response = "I am server";
                        var responseBuffer = ByteBuffer.wrap(response.getBytes());
                        clientSocket.write(responseBuffer);
                        clientSocket.close();
                        log.info("end");
                    }
                    @Override
                    public void failed(Throwable exc, Object attachment) {

                    }
                });
            }
            @Override
            public void failed(Throwable exc, Object attachment) {

            }
        });
        Thread.sleep(1000000);
        log.info("end");

    }
}

 

Callback이 2개이다.

바깥은 accept하는 부분이고, 안 쪽은 read하는 부분의 callback이다.

 

 

  • socket - Future
@Slf4j
public class AsyncServerSocketFutureExample {

    @SneakyThrows
    public static void main(String[] args) {
        log.info("start");
        var serverSocketChannel = AsynchronousServerSocketChannel.open();
        var address = new InetSocketAddress("localhost", 8080);
        serverSocketChannel.bind(address);

        Future<AsynchronousSocketChannel> clientSocketFuture = serverSocketChannel.accept();
        while(!clientSocketFuture.isDone()){
            Thread.sleep(100);
            log.info("Wainting");
        }
        var clientSocket = clientSocketFuture.get();

        var requestBuffer = ByteBuffer.allocateDirect(1024);
        Future<Integer> channelRead = clientSocket.read(requestBuffer);
        while(!channelRead.isDone()){
            log.info("Reading...");
        }

        requestBuffer.flip();
        var request = StandardCharsets.UTF_8.decode(requestBuffer);
        log.info("request: {}", request);

        var response = "This is server.";
        var responseBuffer = ByteBuffer.wrap(response.getBytes());
        clientSocket.write(responseBuffer);
        clientSocket.close();
        log.info("end client");

    }
}

 

Future로 변경을 해보았다.

File 때와 마찬가지로 while에서 동기적으로 작동하게 되며, 보완이 필요하다고 생각된다.

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

Java Selector  (0) 2024.03.20
Java IO Server 서버를 NIO로 변경  (0) 2024.03.19
Java NIO  (0) 2024.03.18
JAVA IO  (0) 2024.03.18
Mutiny  (0) 2024.03.14
728x90

지금까지 배운 것을 바탕으로 서버를 만들고, 차례로 성능을 높여보자.

 

 

우선 간단한 기본 서버이다.

 

@Slf4j
public class JavaIOServer {

    @SneakyThrows
    public static void main(String[] args) {
        log.info("start");

        try(ServerSocket serverSocket = new ServerSocket()){
            serverSocket.bind(new InetSocketAddress("localhost", 8080));

            while(true){

                Socket clientSocket = serverSocket.accept();

                byte[] requestBytes = new byte[1024];
                InputStream inputStream = clientSocket.getInputStream();
                inputStream.read(requestBytes);

                log.info("request: {}", new String(requestBytes).trim());

                OutputStream outputStream = clientSocket.getOutputStream();
                String response = "I am Server";
                outputStream.write(response.getBytes());
                outputStream.flush();
            }

        }
    }
}

 

간단한 서버이다.

while(true)를 돌며 계속 응답을 받고, 받은 후에 클라이언트에게 I am Server라는 값을 준다.

 

클라이언트이다.

요청을 보내고, 응답을 받은 후 종료된다.

@Slf4j
public class JavaIOClient {

    public static void main(String[] args) {
        try(Socket socket = new Socket()){
            socket.connect(new InetSocketAddress("localhost", 8080));

            var outputStream = socket.getOutputStream();
            String requestBody = "I am Seungkyu client";
            outputStream.write(requestBody.getBytes());
            outputStream.flush();

            InputStream inputStream = socket.getInputStream();
            byte[] responseBytes = new byte[1024];
            inputStream.read(responseBytes);

            log.info("result: {}", new String(responseBytes).trim());


        }catch (IOException e){
            throw new RuntimeException(e);
        }
    }
}

이런 식으로 서버에 계속 요청이 오는 것을 볼 수 있다.

 

이제 서버를 NIO로 변경해보자.

@Slf4j
public class JavaNIOBlockingServer {

    @SneakyThrows
    public static void main(String[] args) {
        log.info("start");

        try(ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {

            serverSocketChannel.bind(new InetSocketAddress("localhost", 8080));

            while(true){

                SocketChannel socketChannel = serverSocketChannel.accept();

                ByteBuffer requestByteBuffer = ByteBuffer.allocateDirect(1024);
                socketChannel.read(requestByteBuffer);
                requestByteBuffer.flip();
                String requestBody = StandardCharsets.UTF_8.decode(requestByteBuffer).toString();
                log.info("request: {}", requestBody);

                ByteBuffer responseByteBuffer = ByteBuffer.wrap("I am Server".getBytes());
                socketChannel.write(responseByteBuffer);
                socketChannel.close();
            }

        }
    }
}

 

Channel을 이용하여 데이터를 읽고 쓰며, 버퍼를 사용하는 것을 볼 수 있다.

이렇게 속도를 높이겠지만, 아직도 accept에서 Blocking이 발생하고 있다.

 

그렇기에 이거를 Non-Blocking으로 바꿔보자

@Slf4j
public class JavaNIONonBlockingServer {

    @SneakyThrows
    public static void main(String[] args) {
        log.info("start");

        try(ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {

            serverSocketChannel.bind(new InetSocketAddress("localhost", 8080));
            serverSocketChannel.configureBlocking(false);

            while(true){

                SocketChannel socketChannel = serverSocketChannel.accept();

                if(socketChannel == null){
                    Thread.sleep(100);
                    continue;
                }

                ByteBuffer requestByteBuffer = ByteBuffer.allocateDirect(1024);
                while(socketChannel.read(requestByteBuffer) == 0){
                    log.info("reading");
                }
                socketChannel.read(requestByteBuffer);
                requestByteBuffer.flip();
                String requestBody = StandardCharsets.UTF_8.decode(requestByteBuffer).toString();
                log.info("request: {}", requestBody);

                ByteBuffer responseByteBuffer = ByteBuffer.wrap("I am Server".getBytes());
                socketChannel.write(responseByteBuffer);
                socketChannel.close();
            }
        }
    }
}

 

configureBlocking에 false를 주어 Non-Blocking으로 동작하게 했다.

하지만 socketChannel이 null 일 때 Thread.sleep을 하기에 좋은 방법은 아닌 것 같다.

또한 requestBuffer에도 데이터가 있는지를 계속 체크 해주어야 하기 때문에, 이 부분도 수정이 필요해보인다.

 

이거를 1000개의 요청으로 얼마나 걸리는지 테스트를 해보도록 하자.

테스트 용도의 코드는 다음과 같다.

@Slf4j
public class JavaIOMultiClient {
    private static ExecutorService executorService = Executors.newFixedThreadPool(50);

    @SneakyThrows
    public static void main(String[] args) {
        log.info("start main");

        List<CompletableFuture<Void>> futures = new ArrayList<>();
        long start = System.currentTimeMillis();

        for (int i = 0; i < 1000; i++) {
            var future = CompletableFuture.runAsync(() -> {
                try (Socket socket = new Socket()) {
                    socket.connect(new InetSocketAddress("localhost", 8080));

                    OutputStream out = socket.getOutputStream();
                    String requestBody = "This is client";
                    out.write(requestBody.getBytes());
                    out.flush();

                    InputStream in = socket.getInputStream();
                    byte[] responseBytes = new byte[1024];
                    in.read(responseBytes);
                    log.info("result: {}", new String(responseBytes).trim());
                } catch (Exception e) {}
            }, executorService);

            futures.add(future);
        }

        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
        executorService.shutdown();
        log.info("end main");
        long end = System.currentTimeMillis();
        log.info("duration: {}", (end - start) / 1000.0);
    }
}

 

50개의 쓰레드로 1000개의 요청을 한다.

 

이렇게 실행해서 시간을 측정해보니, 약 12초가 나왔다.

 

여기서 코드를 수정해서 속도를 높일 수는 없을까?

 

서버에서 처리하는 코드를 CompletableFuture를 이용해 별개의 쓰레드에서 처리하도록 했다.

@Slf4j
public class JavaNIONonBlockingServer {
    @SneakyThrows
    public static void main(String[] args) {
        log.info("start main");
        try (ServerSocketChannel serverSocket = ServerSocketChannel.open()) {
            serverSocket.bind(new InetSocketAddress("localhost", 8080));
            serverSocket.configureBlocking(false);

            while (true) {
                SocketChannel clientSocket = serverSocket.accept();
                if (clientSocket == null) {
                    Thread.sleep(100);
                    continue;
                }

                CompletableFuture.runAsync(() -> {
                    handleRequest(clientSocket);
                });
            }
        }
    }

    @SneakyThrows
    private static void handleRequest(SocketChannel clientSocket) {
        ByteBuffer requestByteBuffer = ByteBuffer.allocateDirect(1024);
        while (clientSocket.read(requestByteBuffer) == 0) {
            log.info("Reading...");
        }
        requestByteBuffer.flip();
        String requestBody = StandardCharsets.UTF_8.decode(requestByteBuffer).toString();
        log.info("request: {}", requestBody);

        Thread.sleep(10);

        ByteBuffer responeByteBuffer = ByteBuffer.wrap("This is server".getBytes());
        clientSocket.write(responeByteBuffer);
        clientSocket.close();
    }
}

 

이렇게 서버를 실행해보니 시간이 많이 준 것을 볼 수 있었다.

 

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

Java Selector  (0) 2024.03.20
Java AIO  (0) 2024.03.19
Java NIO  (0) 2024.03.18
JAVA IO  (0) 2024.03.18
Mutiny  (0) 2024.03.14
728x90

Java에서 NIO에 대해 알아보자.

 

java New Input/Output의 줄인말이며, 파일과 네트워크에 데이터를 읽고 쓸 수 있는 API를 제공한다.

buffer기반이며, non-blocking을 지원한다.

 

java IO는 InputStream, OutputStream을 사용해서 방향이 정해져있었다면, NIO는 Channel을 이용해 읽고 쓰기가 한번에 가능하다.

또한 NIO는 아예 시작부터 buffer의 단위로 데이터를 주고 받는다.

 

Channel, Buffer

데이터를 읽을 때는 Channel read()를 사용하여 버퍼에 데이터를 저장한다.

데이터를 쓸 때는 Channel write()를 사용하여 버퍼에 데이터를 쓴다.

 

이 버퍼에서도 ByteBuffer부터 CharBuffer까지 다양한 타입의 버퍼가 존재한다.

 

Buffer의 위치

버퍼의 현재 커서를 나타내는 다양한 속성이 있다.

capacity: Buffer의 크기이다. Buffer 생성시에 결정된다.

position: Buffer의 현재 위치이다. 버퍼에서 데이터를 읽거나 쓸 때 시작하는 위치이며, 1Byte가 추가될 때마다 1씩 증가한다.

limit: Buffer에서 데이터를 읽거나 쓸 수 있는 마지막 위치이다. limit 이후로 데이터를 읽거나 쓰기 불가능하다.

mark: 현재 position의 위츠를 지정한다.

 

Buffer의 위치 메서드

flip: Buffer의 limit 위치를 현재 position으로 이동시키고, position을 0으로 리셋한다.

rewind: Buffer의 position 위치를 0으로 리셋. limit은 유지한다.

clear: Buffer의 limit 위치를 capacity 위치로 이동시키고, position을 0으로 리셋한다.

 

Buffer

Java IO는 JVM의 버퍼에만 접근이 가능하고, 시스템 자체에는 접근이 불가능했다.

하지만 Java NIO는 시스템 버퍼에도 접근이 가능하다.

NIO의 버퍼들이다.

DirectByteBuffer HeapByteBuffer
시스템 자체 메모리에 저장 JVM Heap 메모리에 저장
자체 메모리에서 JVM으로 복사를 하지 않기 때문에 속도가 빠르다. JVM으로 복사가 일어나므로 데이터를 읽고 쓰는 속도가 느리다.
allocate, deallocate가 느리다. gc에서 관리하기에 allocate, deallocate가 빠르다.

 

DirectByteBuffer는 allocateDirect() 함수로 생성이 가능하고, HeapByteBuffer는 allocate(), wrap() 함수로 생성 가능하다.

isDirect인지 DirectBuffer인지 아닌지 확인 가능하다.

 

FileChannel - Read

@Slf4j
public class FileChannelReadExample {

    public static void main(String[] args) throws IOException {
        log.info("start");

        var file = new File("path");

        try(var fileChannel = FileChannel.open(file.toPath())){
            var byteBuffer = ByteBuffer.allocateDirect(1024);
            fileChannel.read(byteBuffer);
            byteBuffer.flip();

            var result = StandardCharsets.UTF_8.decode(byteBuffer);
            log.info("result: {}", result);
        }
        log.info("end");
    }
}

 

allocateDirect로 커널을 사용하여 파일을 읽어온다.

NIO를 사용하기 때문에 FileChannel에서 읽고 쓰기가 가능하다.

 

아래는 ByteBuffer.wrap()으로 JVM버퍼를 사용하는 예제이다.

@Slf4j
public class FileChannelWriteExample {

    public static void main(String[] args) throws IOException {
        var file = new File("path");

        var mode = StandardOpenOption.WRITE;
        try(var fileChannel = FileChannel.open(file.toPath(), mode)){
            var buffer = ByteBuffer.wrap("Hi Seungkyu~~".getBytes());
            var result = fileChannel.write(buffer);
            log.info("result: {}", result);
        }
    }
}

 

SocketChannel

이제 배운것들로 소켓 서버와 클라이언트를 만들어보자.

 

우선 서버이다.

@Slf4j
public class ServerSocketChannelExample {

    public static void main(String[] args) throws IOException {

        log.info("start");

        try(var serverChannel = ServerSocketChannel.open()){
            var address = new InetSocketAddress("localhost", 8080);
            serverChannel.bind(address);

            try(var clientSocket = serverChannel.accept()){
                ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
                clientSocket.read(byteBuffer);
                byteBuffer.flip();

                var request = new String(byteBuffer.array()).trim();
                log.info("request: {}", request);

                var response = "I am Server";
                var responseBuffer = ByteBuffer.wrap(response.getBytes());
                clientSocket.write(responseBuffer);
                responseBuffer.flip();
            }

        }

        log.info("end");
    }
}

 

Direct 버퍼를 만들어서 읽어온다.

Socket에서도 read와 flip을 사용해서 읽어노는 것을 볼 수 있다.

 

다음은 클라이언트이다.

@Slf4j
public class ClientSocketChannelExample {

    public static void main(String[] args) throws IOException {
        log.info("start");

        try(var socketChannel = SocketChannel.open()){
            var address = new InetSocketAddress("localhost", 8080);
            var connected = socketChannel.connect(address);
            log.info("connected: {}", connected);

            String request = "I am Client";
            ByteBuffer requestBuffer = ByteBuffer.wrap(request.getBytes());
            socketChannel.write(requestBuffer);
            requestBuffer.clear();
            
            ByteBuffer responseBuffer = ByteBuffer.allocateDirect(1024);
            while (socketChannel.read(responseBuffer) > 0){
                responseBuffer.flip();
                log.info("response: {}", StandardCharsets.UTF_8.decode(responseBuffer));
                responseBuffer.clear();
            }
        }
        log.info("end");
    }
}

 

 

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

Java AIO  (0) 2024.03.19
Java IO Server 서버를 NIO로 변경  (0) 2024.03.19
JAVA IO  (0) 2024.03.18
Mutiny  (0) 2024.03.14
RxJava Reactor  (2) 2024.03.14
728x90

Java에서 IO에 대한 부분을 알아보자.

 

우선 Java IO는 파일과 네트워크에 데이터를 읽고 쓸 수 있는 API를 제공하며, blocking으로 동작한다.

 

InputStream

우선 InputStream부터 살펴보자.

public abstract class InputStream implements Closeable{
    public abstract int read() throws IOException;
    public void close() throws IOException;
}

 

read는 stream으로 데이터를 읽고 읽은 값을 반환한다. 만약 -1을 반환한다면 파일의 끝인 EOF이다.

close는 stream을 닫고 더 이상 데이터를 읽지 않는다.

 

이 InputStream은 다양한 구현체가 존재한다.

파일을 읽어오는 FileInputStream, 바이트 배열을 읽어오는 ByteArrayInputStream, 별도의 버퍼를 만들어서 읽어오는 BufferedInputStream 등이 있다.

 

ByteArrayInputStream

byte array로부터 값을 읽을 수 있다.

 

@Slf4j
public class ByteArrayInputStreamAllExample {
    public static void main(String[] args) throws IOException {
        log.info("start");

        var bytes = new byte[]{100, 101, 102, 103, 104, 105};

        try(var byteArrayInputStream = new ByteArrayInputStream(bytes)){
            var values = byteArrayInputStream.readAllBytes();
            log.info("values: {}", values);
        }
        log.info("end");
    }
}

 

해당 코드로 알아보도록 하자.

 

byteArray에서 ByteArrayInputStream을 통해 데이터를 모두 읽어오는 것을 볼 수 있다.

 

readAllBytes는 모두 읽어오는 메서드이고, read를 사용하면 하나씩 데이터를 가져오게 된다.

@Slf4j
public class ByteArrayInputStreamExample {

    public static void main(String[] args) throws IOException {
        log.info("start");
        var bytes = new byte[]{100, 101, 102, 103, 104};

        try(var byteArrayInputStream = new ByteArrayInputStream(bytes)){
            var value = 0;

            while((value = byteArrayInputStream.read()) != -1)
                log.info("value: {}", value);
        }

        log.info("end");
    }
}

 

read를 사용하면, 하나씩 읽어온다.

 

FileInputStream

file로부터 byte 단위로 값을 읽을 수 있다.

file을 읽어오는 동안에는 blocking이 일어난다.

 

@Slf4j
public class FileInputStreamExample {
    public static void main(String[] args) throws IOException {
        log.info("start main");
        var file = new File("path");

        try (var fis = new FileInputStream(file)) {
            var value = 0;

            while ((value = fis.read()) != -1) {
                log.info("value: {}", (char)value);
            }
        }
        log.info("end main");
    }
}

 

이렇게 해당 파일을 읽어오게 된다.

읽을 때는 한 바이트 씩 읽어오게 된다.

 

BufferedInputStream

다른 inputStream과 조합해서 사용한다.

이름 그대로 읽어올 때마다, buffer의 사이즈만큼 미리 읽어오게 된다.

@Slf4j
public class BufferedInputStreamExample {

    public static void main(String[] args) throws IOException {
        var file = new File("path");

        try(var fileInputStream = new FileInputStream(file)){
            try(var bufferedInputStream = new BufferedInputStream(fileInputStream)){
                var value = 0;

                while((value = bufferedInputStream.read()) != -1){
                    log.info("value : {}", (char)value);
                }
            }
        }
    }
}

 

이렇게 항상 File에 접근하는 것이 아닌 buffer에 접근하여 데이터를 가져오는 것을 볼 수 있다.

 

SocketInputStream

SocketInputStream은 Public이 아니기 때문에 직접 접근이 불가능하다.

호출 시에 blocking이 발생한다.

 

간단한 서버를 만들어보았다.

@Slf4j
public class ServerSocketInputStreamExample {

    @SneakyThrows
    public static void main(String[] args) {
        log.info("start");

        ServerSocket serverSocket = new ServerSocket(8080);

        Socket clientSocket = serverSocket.accept();

        var inputStream = clientSocket.getInputStream();
        var bufferedInputStream = new BufferedInputStream(inputStream);
        byte[] buffer = new byte[1024];
        int bytesRead = bufferedInputStream.read(buffer);
        String inputLine = new String(buffer, 0, bytesRead);
        log.info("bytes: {}", inputLine);

        clientSocket.close();
        serverSocket.close();

        log.info("end");

    }
}

 

서버와 클라이언트의 동작 방식은 네트워크 프로그래밍에서 배웠기 때문에 다루지 않도록 하겠다.

 

서버 소캣으로부터 inputStream을 만들어서 버퍼로 가져오는 코드이다.

하지만 현재는 클라이언트가 없어서 아마 아무 결과도 나오지 않을 것이다.

 

 

OutputStream

public abstract class OutputStream implements Closeable, Flushable{
    public abstract void write(int b) throws IOException;
    public void flush() throws IOException{}
    public void close() throws IOException{}
}

 

inputStream과는 다르게 버퍼를 출력하고 비우는 Flush가 존재하는 것을 볼 수 있다.

 

당연히 OutputStream도 InputStream처럼 많은 구현체가 존재하는 것을 알 수 있다.

 

 

ByteArrayOutputStream

@Slf4j
public class ByteArrayOutputStreamExample {

    public static void main(String[] args) throws IOException {
        log.info("start");

        try(var byteArrayOutputStream = new ByteArrayOutputStream()){
            byteArrayOutputStream.write(100);
            byteArrayOutputStream.write(101);
            byteArrayOutputStream.write(102);
            byteArrayOutputStream.write(103);
            byteArrayOutputStream.write(104);

            var bytes = byteArrayOutputStream.toByteArray();
            log.info("bytes: {}", bytes);
        }

        log.info("end");
    }
}

 

byteArrayOutputStream에 하나씩 작성하고 마지막에 byteArray로 변환하여 값을 출력한다.

 

 

ByteArrayOutputStream 

file에 값을 쓰는 OutputStream이다.

해당 메서드도 blocking이 일어난다.

@Slf4j
public class FileOutputStreamExample {

    public static void main(String[] args) throws IOException {
        var file = new File("path");

        try(var fileOutputStream = new FileOutputStream(file)){
            var data = "Hi Seungkyu";

            fileOutputStream.write(data.getBytes());
            fileOutputStream.flush();
        }
    }
}

 

write 후에 flush까지 호출해 주도록 한다.

 

 

BufferedOutputStream

이것도 마찬가지로 다른 outputStream과 조합하여 사용한다.

write 호출하면 buffer에만 write 하고, 후에 Flush를 호출하여 한 번에 outputStream에 작성한다.

@Slf4j
public class BufferedOutputStreamExample {

    public static void main(String[] args) throws IOException {
        var file = new File("path");

        var fileOutputStream = new FileOutputStream(file);
        try(var bufferedOutputStream = new BufferedOutputStream(fileOutputStream)){
            var data = "Hi seungkyu!!";

            bufferedOutputStream.write(data.getBytes());
            bufferedOutputStream.flush();
        }
    }
}

 

write 후에 작성을 위하여 flush를 사용해야 한다.

 

 

SocketOutputStream

SocketOutputStream도 Public이 아니기 때문에 직접 접근이 불가능하다.

호출 시에 blocking이 발생한다.

 

@Slf4j
public class ServerSocketOutputStreamExample {

    @SneakyThrows
    public static void main(String[] args) {
        log.info("start");

        ServerSocket serverSocket = new ServerSocket(8080);

        Socket clientSocket = serverSocket.accept();

        byte[] buffer = new byte[1024];
        clientSocket.getInputStream().read(buffer);

        var outputStream = clientSocket.getOutputStream();
        var byteOutputStream = new BufferedOutputStream(outputStream);
        var bytes = "Hi Seungkyu".getBytes();
        byteOutputStream.write(bytes);
        byteOutputStream.flush();

        clientSocket.close();
        serverSocket.close();
        log.info("end");
    }
}

 

이런 식으로 내용을 작성하게 된다.

 

Java IO Reader, Writer

Stream에서 character 단위로 읽고 쓸 수 있다.

문자 인코딩을 지원하여 바로 문자로 가져올 수 있다는 것이다.

blocking으로 동작하게 된다.

 

Reader

바로 인코딩을 지정하여 3바이트씩 가져오게 된다.

@Slf4j
public class FileReaderExample {

    public static void main(String[] args) throws IOException {
        var file = new File("path");

        var charset = StandardCharsets.UTF_8;

        try(var fileInputStream = new FileReader(file, charset)){
            var value = 0;

            while((value = fileInputStream.read()) != -1){
                log.info("value: {}", (char)value);
            }
        }
    }
}

 

 

Writer

@Slf4j
public class FileWriterExample {

    public static void main(String[] args) throws IOException {

        var file = new File("path");

        var charset = StandardCharsets.UTF_8;
        try(var fileInputStream = new FileWriter(file, charset)){
            var data = "hihihihi";
            fileInputStream.write(data);
        }
    }
}

 

바로 문자열을 write하게 된다.

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

Java IO Server 서버를 NIO로 변경  (0) 2024.03.19
Java NIO  (0) 2024.03.18
Mutiny  (0) 2024.03.14
RxJava Reactor  (2) 2024.03.14
Project reactor  (0) 2024.03.14
728x90

Multi

0..n개의 item을 전달

에러가 발생하면 error signal을 전달하고, 모든 item을 전달하면 complete signal을 전달하고 종료한다.

backPressure를 지원하며, Reactor의 Flux와 유사하다.

 

Multi의 Subscriber이다.

다른 Subscriber들과 메서드 이름이 좀 다르긴 하지만, 기능은 생각하는 그대로이다.

@Slf4j
@RequiredArgsConstructor
public class SimpleMultiSubscriber<T> implements MultiSubscriber<T> {

    private final Integer count;

    @Override
    public void onItem(T item) {
        log.info("item: {}", item);
    }

    @Override
    public void onFailure(Throwable failure) {
        log.error("fail: {}", failure.getMessage());
    }

    @Override
    public void onCompletion() {
        log.info("completion");
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        subscription.request(count);
        log.info("subscribe");
    }
}

 

예제로 실행을 해보자.

@Slf4j
public class MultiExample {

    public static void main(String[] args) {
        getItems()
                .subscribe()
                .withSubscriber(
                        new SimpleMultiSubscriber<>(Integer.MAX_VALUE)
                );
    }

    private static Multi<Integer> getItems(){
        return Multi.createFrom().items(1, 2, 3, 4, 5);
    }
}

 

subscribe에 Subscriber를 넣는 것이 아니라, withSubscriber에 넘겨야 한다.

 

Uni

0..1개의 item을 전달

에러가 발생하면 error signal 전달하고 종료, 모든 item을 전달했다면 complete signal을 전달하고 종료한다.

Reactor의 Mono와 유사하다.

 

@Slf4j
@RequiredArgsConstructor
public class SimpleUniSubscriber<T> implements UniSubscriber<T> {

    private final Integer count;
    private UniSubscription subscription;

    @Override
    public void onSubscribe(UniSubscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
        log.info("subscribe");
    }

    @Override
    public void onItem(T item) {
        log.info("item: {}", item);
    }

    @Override
    public void onFailure(Throwable failure) {
        log.error("error: {}", failure.getMessage());
    }
}

 

하나면 보내면 complete이기 때문에 따로 complete 메서드는 없다.

 

아래의 코드로 실행을 해보자.

@Slf4j
public class UniExample {

    public static void main(String[] args) {
        getItem()
                .subscribe()
                .withSubscriber(new SimpleUniSubscriber<>(Integer.MAX_VALUE));
    }

    private static Uni<Integer> getItem(){
        return Uni.createFrom().item(1);
    }
}

 

그럼 하나의 값이 나오게 될 것이다.

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

Java NIO  (0) 2024.03.18
JAVA IO  (0) 2024.03.18
RxJava Reactor  (2) 2024.03.14
Project reactor  (0) 2024.03.14
HotPublisher 구현  (0) 2024.03.13
728x90

Netflix에서 구현한 Reactive Stream이라고 한다.

 

Flowable

0..n개의 item을 전달한다.

에러가 발생하면 error signal을 전달하고 종료하며, 모든 item을 전달하면 complete signal을 전달하고 종료한다.

중요한 backPressure를 지원한다고 한다.

이거까지만 보면 Flux와 굉장히 비슷해 보이는 거 같다.

덕분에 다행이도 저번에 작성했던 subscriber를 사용할 수 있다.

 

  • subscribe - Flowable

저번에 사용했던 코드와 크게 다르지 않다.

@Slf4j
public class FlowableExample {

    public static void main(String[] args) {
        log.info("start main");

        getItems()
                .subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));

        log.info("end main");
    }

    private static Flowable<Integer> getItems(){
        return Flowable.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

딱 생각했던 대로 출력이 된다.

 

  • backPressure - Flowable

전에 작성했던 ContinuousRequestSubscriber를 사용해서 backPressure를 확인해보자.

@Slf4j
public class FlowableContinuousRequestSubscriberExample {

    public static void main(String[] args) {
        log.info("start main");
        getItems()
                .subscribe(new ContinuousRequestSubscriber<>());
        log.info("end main");
    }

    private static Flowable<Integer> getItems(){
        return Flowable.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

이렇게 요청에 따라 데이터를 주는 것을 볼 수 있다.

 

Observable

0..n개의 item을 전달한다.

에러가 발생하면 error signal을 전달하고 종료하며, 모든 item을 전달하면 complete signal을 전달하고 종료한다.

하지만 backPressure를 지원하지는 않는다고 한다.

 

subscriber가 요청하지 않더라도 item을 전달하는 형식이다.

 

간단하게 작성하여 실행해 보았는데, request를 하지 않아도 지속적으로 데이터를 주는 것을 볼 수 있었다.

 

Single

1개의 item을 전달 후 바로 onComplete signal wjsekf

1개의 item이 없다면 onError signal을 전달하고, 에러가 발생해도 onError signal을 전달한다.

 

@Slf4j
public class SimpleObserver implements Observer {

    private Disposable disposable;

    @Override
    public void onSubscribe(@NonNull Disposable d) {
        log.info("subscribe");
        this.disposable = d;
    }

    @Override
    public void onNext(@NonNull Object o) {
        log.info("item: {}", o);
    }

    @Override
    public void onError(@NonNull Throwable e) {
        log.error("error: {}", e.getMessage());
    }

    @Override
    public void onComplete() {
        log.info("complete");
    }
}

 

이렇게 Oberser를 작성하고 예제를 실행해보자.

@Slf4j
public class SingleExample {

    public static void main(String[] args) {
        getItem()
                .subscribe(new SimpleSingleObserver<>());

        getNullItem()
                .subscribe(new SimpleSingleObserver<>());
    }

    private static Single<Integer> getItem(){
        return Single.create(singleEmitter -> {
            singleEmitter.onSuccess(1);
        });
    }

    private static Single<Integer> getNullItem(){
        return Single.create(singleEmitter -> {
            singleEmitter.onSuccess(null);
        });
    }
}

 

위에 코드는 성공이고, 밑의 코드는 null을 반환한다.

Single에서는 null을 반환하면 에러가 뜨는 것을 확인 할 수 있다.

 

Maybe

1개의 Item을 전달 수 바로 OnComplete signal을 전달한다.

1개의 Item이 없어도 onComplete signal 전달 가능

에러가 발생했다면 onError signal 전달

 

@Slf4j
public class SimpleMaybeObserver<T> implements MaybeObserver<T> {

    private Disposable disposable;

    @Override
    public void onSubscribe(@NonNull Disposable d) {
        this.disposable = d;
        log.info("subscribe");
    }

    @Override
    public void onSuccess(@NonNull T t) {
        log.info("item: {}", t);
    }

    @Override
    public void onError(@NonNull Throwable e) {
        log.error("error: {}", e.getMessage());
    }

    @Override
    public void onComplete() {
        log.info("complete");
    }
}

 

바로 onComplete를 전달 할 수도 있어서, 해당 메서드도 implements가 된 것을 볼 수 있다.

 

@Slf4j
public class MaybeExample {

    public static void main(String[] args) {
        maybeGetItem()
                .subscribe(new SimpleMaybeObserver<>());

        maybeGetNullItem()
                .subscribe(new SimpleMaybeObserver<>());

    }

    private static Maybe<Integer> maybeGetItem(){
        return Maybe.create(maybeEmitter -> {
            maybeEmitter.onSuccess(1);
        });
    }

    private static Maybe<Integer> maybeGetNullItem(){
        return Maybe.create(maybeEmitter -> {
           maybeEmitter.onComplete();
        });
    }
}

 

바로 onComplete 호출이 가능한 것을 확인 할 수 있다.

 

Completable

onComplete 혹은 onError signal만 전달한다.

값이 아닌 사건을 전달해준다.

 

@Slf4j
public class SimpleCompletableObserver implements CompletableObserver {

    private Disposable disposable;

    @Override
    public void onSubscribe(@NonNull Disposable d) {
        log.info("subscribe");
    }

    @Override
    public void onComplete() {
        log.info("complete");
    }

    @Override
    public void onError(@NonNull Throwable e) {
        log.error("error: {}", e.getMessage());
    }
}

 

이런 식으로 구성이 되어 있고

@Slf4j
public class CompletableExample {

    public static void main(String[] args) {
        getCompletion()
                .subscribe(new SimpleCompletableObserver());
    }

    private static Completable getCompletion(){
        return Completable.create(completableEmitter -> {
            Thread.sleep(1000);
            completableEmitter.onComplete();
        });
    }
}

 

이렇게 사용한다고 한다.

자세한 부분까지는 모르겠다.

 

 

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

JAVA IO  (0) 2024.03.18
Mutiny  (0) 2024.03.14
Project reactor  (0) 2024.03.14
HotPublisher 구현  (0) 2024.03.13
ColdPublisher 구현  (0) 2024.03.13
728x90

Pivotal에서 개발한 Project reactor에 대하여 알아보자.

 

우선 Mono와 Flux를 implements 했다.

 

우선 아래는 이번에 사용할 Subscriber 코드이다.

@Slf4j
@RequiredArgsConstructor
public class SimpleSubscriber <T> implements Subscriber<T> {

    private final Integer count;

    @Override
    public void onSubscribe(Subscription s) {
        log.info("subscribe");
        s.request(count);
        log.info("request: {}", count);
    }

    @SneakyThrows
    @Override
    public void onNext(T t) {
        log.info("item: {}", t);
        Thread.sleep(100);
    }

    @Override
    public void onError(Throwable t) {
        log.error("error: {}", t.getMessage());
    }

    @Override
    public void onComplete() {
        log.info("complete");
    }
}

 

reactivestreams의 Subscriber이며, 저번에 구현했던 Subscriber와 굉장히 유사한 것을 볼 수 있다.

 

Flux

Flux는 0~n 개의 item을 전달한다.

만약 에러가 발생하면 error signal을 전달하고 종료하며, 모든 item을 전달했다면 complete signal을 전달하고 종료한다.

backPressure를 지원한다.

 

  • subscribe - Flux

우선 바로 사용해보자.

@Slf4j
public class FluxSimpleExample {

    @SneakyThrows
    public static void main(String[] args) {
        log.info("start main");
        getItems()
                .subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");

        Thread.sleep(1000);
    }

    public static Flux<Integer> getItems(){
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

그냥 간단하게 하나씩 넘겨주는 것이고 이것에 대한 결과는 아래와 같다.

 

모두 main 쓰레드에서 실행되는 것을 볼 수 있다.

 

  • subscribeOn - Flux

다른 쓰레드에서 실행시키고 싶다면 아래의 subscribeOn을 사용해야 한다.

@Slf4j
public class FluxSimpleSubscribeOnExample {

    @SneakyThrows
    public static void main(String[] args) {
        log.info("start main");
        getItems()
                .map(i -> {
                    log.info("map {}", i);
                    return i;
                })
                .subscribeOn(Schedulers.single())
                .subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));

        log.info("end main");

        Thread.sleep(1000);
    }

    private static Flux<Integer> getItems(){
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

이렇게 subscribeOn으로 쓰레드를 지정해주면, 해당 쓰레드에서 실행이 된다.

 

 

  • backPressure - Flux

 

BackPressure를 수행하는 코드를 작성해보자.

지금 Subscriber는 요청을 한 번만 하기 때문에 Subscriber를 다시 작성해야 한다.

 

다시 작성한 Subscribe이다.

onNext를 호출 당할 때마다, request로 다시 호출하여 계속 반복이 된다.

@Slf4j
public class ContinuousRequestSubscriber<T> implements Subscriber<T> {

    private final Integer count = 1;
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription s) {
        this.subscription = s;
        log.info("subscribe");
        s.request(count);
        log.info("request: {}", count);
    }

    @SneakyThrows
    @Override
    public void onNext(T t) {
        log.info("item: {}", t);

        Thread.sleep(1000);
        subscription.request(1);
        log.info("request: {}", count);
    }

    @Override
    public void onError(Throwable t) {
        log.error("error: {}", t.getMessage());
    }

    @Override
    public void onComplete() {
        log.info("complete");
    }
}

 

이렇게 작성된 Subscriber를 

@Slf4j
public class FluxContinuousRequestSubscriberExample {
    public static void main(String[] args) {
        getItems().subscribe(new ContinuousRequestSubscriber<>());
    }

    private static Flux<Integer> getItems() {
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

이 코드로 실행해본다면 다음과 같이 나오는 것을 볼 수 있다.

이렇게 Request 한 만큼만 데이터를 주어, 조절하는 것을 볼 수 있다.

 

  • error - Flux

이번에는 에러에 대한 처리이다.

Flux는 에러를 만나면 더 이상 진행하지 않는다.

@Slf4j
public class FluxErrorExample {

    public static void main(String[] args) {
        log.info("start main");
        getItems().subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }

    private static Flux<Integer> getItems(){
        return Flux.create(fluxsink -> {
            fluxsink.next(0);
            fluxsink.next(1);
            var error = new RuntimeException("error in flux");
            fluxsink.error(error);
            fluxsink.next(2);
        });
    }
}

 

이렇게 0과 1을 주고 다음으로 에러를 넘기면 다음과 같이 나온다.

 

0과 1만 나오고 에러를 반환한 후 종료되는 것을 볼 수 있다.

 

  • complete - Flux

complete하는 방법으로 complete를 넘기면 종료된다.

@Slf4j
public class FluxCompleteExample {

    public static void main(String[] args) {
        log.info("start main");
        getItems().subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }

    private static Flux<Integer> getItems(){
        return Flux.create(fluxSink -> {
            fluxSink.complete();
        });
    }
}

 

값을 넘기지 않아도 바로 종료되는 것을 볼 수 있다.

 

Mono

0..1개의 item을 전달한다.

에러가 발생하면 error signal을 전달하고 종료되며, 모든 item을 전달하면 complete signal을 전달하고 종료된다.

 

하나의 item만 전달하기 때문에 next 실행 후 바로 complete가 보장된다.

혹은 전달하지 않고 complete를 하면 값이 없다는 것을 의미한다.

이러한 이유로 Flux를 사용하지 않고 Mono를 사용하는 경우가 있다.

데이터베이스에서 하나의 값을 가져오는 경우 등에서 사용한다.

 

  • subscribe - Mono
@Slf4j
public class MonoSimpleExample {

    @SneakyThrows
    public static void main(String[] args) {
        log.info("start main");

        getItems()
                .subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));

        log.info("end main");
        Thread.sleep(1000);
    }

    private static Mono<Integer> getItems(){
        return Mono.create(monoSink -> {
            monoSink.success(1);
        });
    }
}

 

일단 바로 사용해보도록 하자.

 

이렇게 하나의 값을 전달하고 종료되는 것을 볼 수 있다.

 

monoSink.success(2);

라는 코드를 추가해도 값이 전달되지 않고 그 전에 종료된다.

 

  • from - Mono

Flux를 Mono로 변환하는 방법이다.

첫 번째 값만 전달하게 된다.

 

from의 파라미터에 Flux를 넘겨주면 된다.

@Slf4j
public class FluxToMonoExample {
    public static void main(String[] args) {
        log.info("start main");
        Mono.from(getItems())
                .subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }

    private static Flux<Integer> getItems() {
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

 

1만 넘어가는 것을 볼 수 있다.

 

  • collectList - Mono

하나의 값만 전달하는 것이 아니라, 값들을 모두 모아 리스트로 전달하는 방법이다.

@Slf4j
public class FluxToListMonoExample {

    public static void main(String[] args) {
        log.info("start main");
        getItems()
                .collectList()
                .subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }

    private static Flux<Integer> getItems(){
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

이렇게 작성하면 수들을 모두 모아 리스트로 전달하게 된다.

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

Mutiny  (0) 2024.03.14
RxJava Reactor  (2) 2024.03.14
HotPublisher 구현  (0) 2024.03.13
ColdPublisher 구현  (0) 2024.03.13
Publisher, Subscriber에 대하여  (0) 2024.03.12

+ Recent posts