728x90

Java에서 IO에 대한 부분을 알아보자.

 

우선 Java IO는 파일과 네트워크에 데이터를 읽고 쓸 수 있는 API를 제공하며, blocking으로 동작한다.

 

InputStream

우선 InputStream부터 살펴보자.

public abstract class InputStream implements Closeable{
    public abstract int read() throws IOException;
    public void close() throws IOException;
}

 

read는 stream으로 데이터를 읽고 읽은 값을 반환한다. 만약 -1을 반환한다면 파일의 끝인 EOF이다.

close는 stream을 닫고 더 이상 데이터를 읽지 않는다.

 

이 InputStream은 다양한 구현체가 존재한다.

파일을 읽어오는 FileInputStream, 바이트 배열을 읽어오는 ByteArrayInputStream, 별도의 버퍼를 만들어서 읽어오는 BufferedInputStream 등이 있다.

 

ByteArrayInputStream

byte array로부터 값을 읽을 수 있다.

 

@Slf4j
public class ByteArrayInputStreamAllExample {
    public static void main(String[] args) throws IOException {
        log.info("start");

        var bytes = new byte[]{100, 101, 102, 103, 104, 105};

        try(var byteArrayInputStream = new ByteArrayInputStream(bytes)){
            var values = byteArrayInputStream.readAllBytes();
            log.info("values: {}", values);
        }
        log.info("end");
    }
}

 

해당 코드로 알아보도록 하자.

 

byteArray에서 ByteArrayInputStream을 통해 데이터를 모두 읽어오는 것을 볼 수 있다.

 

readAllBytes는 모두 읽어오는 메서드이고, read를 사용하면 하나씩 데이터를 가져오게 된다.

@Slf4j
public class ByteArrayInputStreamExample {

    public static void main(String[] args) throws IOException {
        log.info("start");
        var bytes = new byte[]{100, 101, 102, 103, 104};

        try(var byteArrayInputStream = new ByteArrayInputStream(bytes)){
            var value = 0;

            while((value = byteArrayInputStream.read()) != -1)
                log.info("value: {}", value);
        }

        log.info("end");
    }
}

 

read를 사용하면, 하나씩 읽어온다.

 

FileInputStream

file로부터 byte 단위로 값을 읽을 수 있다.

file을 읽어오는 동안에는 blocking이 일어난다.

 

@Slf4j
public class FileInputStreamExample {
    public static void main(String[] args) throws IOException {
        log.info("start main");
        var file = new File("path");

        try (var fis = new FileInputStream(file)) {
            var value = 0;

            while ((value = fis.read()) != -1) {
                log.info("value: {}", (char)value);
            }
        }
        log.info("end main");
    }
}

 

이렇게 해당 파일을 읽어오게 된다.

읽을 때는 한 바이트 씩 읽어오게 된다.

 

BufferedInputStream

다른 inputStream과 조합해서 사용한다.

이름 그대로 읽어올 때마다, buffer의 사이즈만큼 미리 읽어오게 된다.

@Slf4j
public class BufferedInputStreamExample {

    public static void main(String[] args) throws IOException {
        var file = new File("path");

        try(var fileInputStream = new FileInputStream(file)){
            try(var bufferedInputStream = new BufferedInputStream(fileInputStream)){
                var value = 0;

                while((value = bufferedInputStream.read()) != -1){
                    log.info("value : {}", (char)value);
                }
            }
        }
    }
}

 

이렇게 항상 File에 접근하는 것이 아닌 buffer에 접근하여 데이터를 가져오는 것을 볼 수 있다.

 

SocketInputStream

SocketInputStream은 Public이 아니기 때문에 직접 접근이 불가능하다.

호출 시에 blocking이 발생한다.

 

간단한 서버를 만들어보았다.

@Slf4j
public class ServerSocketInputStreamExample {

    @SneakyThrows
    public static void main(String[] args) {
        log.info("start");

        ServerSocket serverSocket = new ServerSocket(8080);

        Socket clientSocket = serverSocket.accept();

        var inputStream = clientSocket.getInputStream();
        var bufferedInputStream = new BufferedInputStream(inputStream);
        byte[] buffer = new byte[1024];
        int bytesRead = bufferedInputStream.read(buffer);
        String inputLine = new String(buffer, 0, bytesRead);
        log.info("bytes: {}", inputLine);

        clientSocket.close();
        serverSocket.close();

        log.info("end");

    }
}

 

서버와 클라이언트의 동작 방식은 네트워크 프로그래밍에서 배웠기 때문에 다루지 않도록 하겠다.

 

서버 소캣으로부터 inputStream을 만들어서 버퍼로 가져오는 코드이다.

하지만 현재는 클라이언트가 없어서 아마 아무 결과도 나오지 않을 것이다.

 

 

OutputStream

public abstract class OutputStream implements Closeable, Flushable{
    public abstract void write(int b) throws IOException;
    public void flush() throws IOException{}
    public void close() throws IOException{}
}

 

inputStream과는 다르게 버퍼를 출력하고 비우는 Flush가 존재하는 것을 볼 수 있다.

 

당연히 OutputStream도 InputStream처럼 많은 구현체가 존재하는 것을 알 수 있다.

 

 

ByteArrayOutputStream

@Slf4j
public class ByteArrayOutputStreamExample {

    public static void main(String[] args) throws IOException {
        log.info("start");

        try(var byteArrayOutputStream = new ByteArrayOutputStream()){
            byteArrayOutputStream.write(100);
            byteArrayOutputStream.write(101);
            byteArrayOutputStream.write(102);
            byteArrayOutputStream.write(103);
            byteArrayOutputStream.write(104);

            var bytes = byteArrayOutputStream.toByteArray();
            log.info("bytes: {}", bytes);
        }

        log.info("end");
    }
}

 

byteArrayOutputStream에 하나씩 작성하고 마지막에 byteArray로 변환하여 값을 출력한다.

 

 

ByteArrayOutputStream 

file에 값을 쓰는 OutputStream이다.

해당 메서드도 blocking이 일어난다.

@Slf4j
public class FileOutputStreamExample {

    public static void main(String[] args) throws IOException {
        var file = new File("path");

        try(var fileOutputStream = new FileOutputStream(file)){
            var data = "Hi Seungkyu";

            fileOutputStream.write(data.getBytes());
            fileOutputStream.flush();
        }
    }
}

 

write 후에 flush까지 호출해 주도록 한다.

 

 

BufferedOutputStream

이것도 마찬가지로 다른 outputStream과 조합하여 사용한다.

write 호출하면 buffer에만 write 하고, 후에 Flush를 호출하여 한 번에 outputStream에 작성한다.

@Slf4j
public class BufferedOutputStreamExample {

    public static void main(String[] args) throws IOException {
        var file = new File("path");

        var fileOutputStream = new FileOutputStream(file);
        try(var bufferedOutputStream = new BufferedOutputStream(fileOutputStream)){
            var data = "Hi seungkyu!!";

            bufferedOutputStream.write(data.getBytes());
            bufferedOutputStream.flush();
        }
    }
}

 

write 후에 작성을 위하여 flush를 사용해야 한다.

 

 

SocketOutputStream

SocketOutputStream도 Public이 아니기 때문에 직접 접근이 불가능하다.

호출 시에 blocking이 발생한다.

 

@Slf4j
public class ServerSocketOutputStreamExample {

    @SneakyThrows
    public static void main(String[] args) {
        log.info("start");

        ServerSocket serverSocket = new ServerSocket(8080);

        Socket clientSocket = serverSocket.accept();

        byte[] buffer = new byte[1024];
        clientSocket.getInputStream().read(buffer);

        var outputStream = clientSocket.getOutputStream();
        var byteOutputStream = new BufferedOutputStream(outputStream);
        var bytes = "Hi Seungkyu".getBytes();
        byteOutputStream.write(bytes);
        byteOutputStream.flush();

        clientSocket.close();
        serverSocket.close();
        log.info("end");
    }
}

 

이런 식으로 내용을 작성하게 된다.

 

Java IO Reader, Writer

Stream에서 character 단위로 읽고 쓸 수 있다.

문자 인코딩을 지원하여 바로 문자로 가져올 수 있다는 것이다.

blocking으로 동작하게 된다.

 

Reader

바로 인코딩을 지정하여 3바이트씩 가져오게 된다.

@Slf4j
public class FileReaderExample {

    public static void main(String[] args) throws IOException {
        var file = new File("path");

        var charset = StandardCharsets.UTF_8;

        try(var fileInputStream = new FileReader(file, charset)){
            var value = 0;

            while((value = fileInputStream.read()) != -1){
                log.info("value: {}", (char)value);
            }
        }
    }
}

 

 

Writer

@Slf4j
public class FileWriterExample {

    public static void main(String[] args) throws IOException {

        var file = new File("path");

        var charset = StandardCharsets.UTF_8;
        try(var fileInputStream = new FileWriter(file, charset)){
            var data = "hihihihi";
            fileInputStream.write(data);
        }
    }
}

 

바로 문자열을 write하게 된다.

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

Java IO Server 서버를 NIO로 변경  (0) 2024.03.19
Java NIO  (0) 2024.03.18
Mutiny  (0) 2024.03.14
RxJava Reactor  (2) 2024.03.14
Project reactor  (0) 2024.03.14
728x90

Multi

0..n개의 item을 전달

에러가 발생하면 error signal을 전달하고, 모든 item을 전달하면 complete signal을 전달하고 종료한다.

backPressure를 지원하며, Reactor의 Flux와 유사하다.

 

Multi의 Subscriber이다.

다른 Subscriber들과 메서드 이름이 좀 다르긴 하지만, 기능은 생각하는 그대로이다.

@Slf4j
@RequiredArgsConstructor
public class SimpleMultiSubscriber<T> implements MultiSubscriber<T> {

    private final Integer count;

    @Override
    public void onItem(T item) {
        log.info("item: {}", item);
    }

    @Override
    public void onFailure(Throwable failure) {
        log.error("fail: {}", failure.getMessage());
    }

    @Override
    public void onCompletion() {
        log.info("completion");
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        subscription.request(count);
        log.info("subscribe");
    }
}

 

예제로 실행을 해보자.

@Slf4j
public class MultiExample {

    public static void main(String[] args) {
        getItems()
                .subscribe()
                .withSubscriber(
                        new SimpleMultiSubscriber<>(Integer.MAX_VALUE)
                );
    }

    private static Multi<Integer> getItems(){
        return Multi.createFrom().items(1, 2, 3, 4, 5);
    }
}

 

subscribe에 Subscriber를 넣는 것이 아니라, withSubscriber에 넘겨야 한다.

 

Uni

0..1개의 item을 전달

에러가 발생하면 error signal 전달하고 종료, 모든 item을 전달했다면 complete signal을 전달하고 종료한다.

Reactor의 Mono와 유사하다.

 

@Slf4j
@RequiredArgsConstructor
public class SimpleUniSubscriber<T> implements UniSubscriber<T> {

    private final Integer count;
    private UniSubscription subscription;

    @Override
    public void onSubscribe(UniSubscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
        log.info("subscribe");
    }

    @Override
    public void onItem(T item) {
        log.info("item: {}", item);
    }

    @Override
    public void onFailure(Throwable failure) {
        log.error("error: {}", failure.getMessage());
    }
}

 

하나면 보내면 complete이기 때문에 따로 complete 메서드는 없다.

 

아래의 코드로 실행을 해보자.

@Slf4j
public class UniExample {

    public static void main(String[] args) {
        getItem()
                .subscribe()
                .withSubscriber(new SimpleUniSubscriber<>(Integer.MAX_VALUE));
    }

    private static Uni<Integer> getItem(){
        return Uni.createFrom().item(1);
    }
}

 

그럼 하나의 값이 나오게 될 것이다.

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

Java NIO  (0) 2024.03.18
JAVA IO  (0) 2024.03.18
RxJava Reactor  (2) 2024.03.14
Project reactor  (0) 2024.03.14
HotPublisher 구현  (0) 2024.03.13
728x90

Netflix에서 구현한 Reactive Stream이라고 한다.

 

Flowable

0..n개의 item을 전달한다.

에러가 발생하면 error signal을 전달하고 종료하며, 모든 item을 전달하면 complete signal을 전달하고 종료한다.

중요한 backPressure를 지원한다고 한다.

이거까지만 보면 Flux와 굉장히 비슷해 보이는 거 같다.

덕분에 다행이도 저번에 작성했던 subscriber를 사용할 수 있다.

 

  • subscribe - Flowable

저번에 사용했던 코드와 크게 다르지 않다.

@Slf4j
public class FlowableExample {

    public static void main(String[] args) {
        log.info("start main");

        getItems()
                .subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));

        log.info("end main");
    }

    private static Flowable<Integer> getItems(){
        return Flowable.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

딱 생각했던 대로 출력이 된다.

 

  • backPressure - Flowable

전에 작성했던 ContinuousRequestSubscriber를 사용해서 backPressure를 확인해보자.

@Slf4j
public class FlowableContinuousRequestSubscriberExample {

    public static void main(String[] args) {
        log.info("start main");
        getItems()
                .subscribe(new ContinuousRequestSubscriber<>());
        log.info("end main");
    }

    private static Flowable<Integer> getItems(){
        return Flowable.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

이렇게 요청에 따라 데이터를 주는 것을 볼 수 있다.

 

Observable

0..n개의 item을 전달한다.

에러가 발생하면 error signal을 전달하고 종료하며, 모든 item을 전달하면 complete signal을 전달하고 종료한다.

하지만 backPressure를 지원하지는 않는다고 한다.

 

subscriber가 요청하지 않더라도 item을 전달하는 형식이다.

 

간단하게 작성하여 실행해 보았는데, request를 하지 않아도 지속적으로 데이터를 주는 것을 볼 수 있었다.

 

Single

1개의 item을 전달 후 바로 onComplete signal wjsekf

1개의 item이 없다면 onError signal을 전달하고, 에러가 발생해도 onError signal을 전달한다.

 

@Slf4j
public class SimpleObserver implements Observer {

    private Disposable disposable;

    @Override
    public void onSubscribe(@NonNull Disposable d) {
        log.info("subscribe");
        this.disposable = d;
    }

    @Override
    public void onNext(@NonNull Object o) {
        log.info("item: {}", o);
    }

    @Override
    public void onError(@NonNull Throwable e) {
        log.error("error: {}", e.getMessage());
    }

    @Override
    public void onComplete() {
        log.info("complete");
    }
}

 

이렇게 Oberser를 작성하고 예제를 실행해보자.

@Slf4j
public class SingleExample {

    public static void main(String[] args) {
        getItem()
                .subscribe(new SimpleSingleObserver<>());

        getNullItem()
                .subscribe(new SimpleSingleObserver<>());
    }

    private static Single<Integer> getItem(){
        return Single.create(singleEmitter -> {
            singleEmitter.onSuccess(1);
        });
    }

    private static Single<Integer> getNullItem(){
        return Single.create(singleEmitter -> {
            singleEmitter.onSuccess(null);
        });
    }
}

 

위에 코드는 성공이고, 밑의 코드는 null을 반환한다.

Single에서는 null을 반환하면 에러가 뜨는 것을 확인 할 수 있다.

 

Maybe

1개의 Item을 전달 수 바로 OnComplete signal을 전달한다.

1개의 Item이 없어도 onComplete signal 전달 가능

에러가 발생했다면 onError signal 전달

 

@Slf4j
public class SimpleMaybeObserver<T> implements MaybeObserver<T> {

    private Disposable disposable;

    @Override
    public void onSubscribe(@NonNull Disposable d) {
        this.disposable = d;
        log.info("subscribe");
    }

    @Override
    public void onSuccess(@NonNull T t) {
        log.info("item: {}", t);
    }

    @Override
    public void onError(@NonNull Throwable e) {
        log.error("error: {}", e.getMessage());
    }

    @Override
    public void onComplete() {
        log.info("complete");
    }
}

 

바로 onComplete를 전달 할 수도 있어서, 해당 메서드도 implements가 된 것을 볼 수 있다.

 

@Slf4j
public class MaybeExample {

    public static void main(String[] args) {
        maybeGetItem()
                .subscribe(new SimpleMaybeObserver<>());

        maybeGetNullItem()
                .subscribe(new SimpleMaybeObserver<>());

    }

    private static Maybe<Integer> maybeGetItem(){
        return Maybe.create(maybeEmitter -> {
            maybeEmitter.onSuccess(1);
        });
    }

    private static Maybe<Integer> maybeGetNullItem(){
        return Maybe.create(maybeEmitter -> {
           maybeEmitter.onComplete();
        });
    }
}

 

바로 onComplete 호출이 가능한 것을 확인 할 수 있다.

 

Completable

onComplete 혹은 onError signal만 전달한다.

값이 아닌 사건을 전달해준다.

 

@Slf4j
public class SimpleCompletableObserver implements CompletableObserver {

    private Disposable disposable;

    @Override
    public void onSubscribe(@NonNull Disposable d) {
        log.info("subscribe");
    }

    @Override
    public void onComplete() {
        log.info("complete");
    }

    @Override
    public void onError(@NonNull Throwable e) {
        log.error("error: {}", e.getMessage());
    }
}

 

이런 식으로 구성이 되어 있고

@Slf4j
public class CompletableExample {

    public static void main(String[] args) {
        getCompletion()
                .subscribe(new SimpleCompletableObserver());
    }

    private static Completable getCompletion(){
        return Completable.create(completableEmitter -> {
            Thread.sleep(1000);
            completableEmitter.onComplete();
        });
    }
}

 

이렇게 사용한다고 한다.

자세한 부분까지는 모르겠다.

 

 

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

JAVA IO  (0) 2024.03.18
Mutiny  (0) 2024.03.14
Project reactor  (0) 2024.03.14
HotPublisher 구현  (0) 2024.03.13
ColdPublisher 구현  (0) 2024.03.13
728x90

Pivotal에서 개발한 Project reactor에 대하여 알아보자.

 

우선 Mono와 Flux를 implements 했다.

 

우선 아래는 이번에 사용할 Subscriber 코드이다.

@Slf4j
@RequiredArgsConstructor
public class SimpleSubscriber <T> implements Subscriber<T> {

    private final Integer count;

    @Override
    public void onSubscribe(Subscription s) {
        log.info("subscribe");
        s.request(count);
        log.info("request: {}", count);
    }

    @SneakyThrows
    @Override
    public void onNext(T t) {
        log.info("item: {}", t);
        Thread.sleep(100);
    }

    @Override
    public void onError(Throwable t) {
        log.error("error: {}", t.getMessage());
    }

    @Override
    public void onComplete() {
        log.info("complete");
    }
}

 

reactivestreams의 Subscriber이며, 저번에 구현했던 Subscriber와 굉장히 유사한 것을 볼 수 있다.

 

Flux

Flux는 0~n 개의 item을 전달한다.

만약 에러가 발생하면 error signal을 전달하고 종료하며, 모든 item을 전달했다면 complete signal을 전달하고 종료한다.

backPressure를 지원한다.

 

  • subscribe - Flux

우선 바로 사용해보자.

@Slf4j
public class FluxSimpleExample {

    @SneakyThrows
    public static void main(String[] args) {
        log.info("start main");
        getItems()
                .subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");

        Thread.sleep(1000);
    }

    public static Flux<Integer> getItems(){
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

그냥 간단하게 하나씩 넘겨주는 것이고 이것에 대한 결과는 아래와 같다.

 

모두 main 쓰레드에서 실행되는 것을 볼 수 있다.

 

  • subscribeOn - Flux

다른 쓰레드에서 실행시키고 싶다면 아래의 subscribeOn을 사용해야 한다.

@Slf4j
public class FluxSimpleSubscribeOnExample {

    @SneakyThrows
    public static void main(String[] args) {
        log.info("start main");
        getItems()
                .map(i -> {
                    log.info("map {}", i);
                    return i;
                })
                .subscribeOn(Schedulers.single())
                .subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));

        log.info("end main");

        Thread.sleep(1000);
    }

    private static Flux<Integer> getItems(){
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

이렇게 subscribeOn으로 쓰레드를 지정해주면, 해당 쓰레드에서 실행이 된다.

 

 

  • backPressure - Flux

 

BackPressure를 수행하는 코드를 작성해보자.

지금 Subscriber는 요청을 한 번만 하기 때문에 Subscriber를 다시 작성해야 한다.

 

다시 작성한 Subscribe이다.

onNext를 호출 당할 때마다, request로 다시 호출하여 계속 반복이 된다.

@Slf4j
public class ContinuousRequestSubscriber<T> implements Subscriber<T> {

    private final Integer count = 1;
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription s) {
        this.subscription = s;
        log.info("subscribe");
        s.request(count);
        log.info("request: {}", count);
    }

    @SneakyThrows
    @Override
    public void onNext(T t) {
        log.info("item: {}", t);

        Thread.sleep(1000);
        subscription.request(1);
        log.info("request: {}", count);
    }

    @Override
    public void onError(Throwable t) {
        log.error("error: {}", t.getMessage());
    }

    @Override
    public void onComplete() {
        log.info("complete");
    }
}

 

이렇게 작성된 Subscriber를 

@Slf4j
public class FluxContinuousRequestSubscriberExample {
    public static void main(String[] args) {
        getItems().subscribe(new ContinuousRequestSubscriber<>());
    }

    private static Flux<Integer> getItems() {
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

이 코드로 실행해본다면 다음과 같이 나오는 것을 볼 수 있다.

이렇게 Request 한 만큼만 데이터를 주어, 조절하는 것을 볼 수 있다.

 

  • error - Flux

이번에는 에러에 대한 처리이다.

Flux는 에러를 만나면 더 이상 진행하지 않는다.

@Slf4j
public class FluxErrorExample {

    public static void main(String[] args) {
        log.info("start main");
        getItems().subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }

    private static Flux<Integer> getItems(){
        return Flux.create(fluxsink -> {
            fluxsink.next(0);
            fluxsink.next(1);
            var error = new RuntimeException("error in flux");
            fluxsink.error(error);
            fluxsink.next(2);
        });
    }
}

 

이렇게 0과 1을 주고 다음으로 에러를 넘기면 다음과 같이 나온다.

 

0과 1만 나오고 에러를 반환한 후 종료되는 것을 볼 수 있다.

 

  • complete - Flux

complete하는 방법으로 complete를 넘기면 종료된다.

@Slf4j
public class FluxCompleteExample {

    public static void main(String[] args) {
        log.info("start main");
        getItems().subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }

    private static Flux<Integer> getItems(){
        return Flux.create(fluxSink -> {
            fluxSink.complete();
        });
    }
}

 

값을 넘기지 않아도 바로 종료되는 것을 볼 수 있다.

 

Mono

0..1개의 item을 전달한다.

에러가 발생하면 error signal을 전달하고 종료되며, 모든 item을 전달하면 complete signal을 전달하고 종료된다.

 

하나의 item만 전달하기 때문에 next 실행 후 바로 complete가 보장된다.

혹은 전달하지 않고 complete를 하면 값이 없다는 것을 의미한다.

이러한 이유로 Flux를 사용하지 않고 Mono를 사용하는 경우가 있다.

데이터베이스에서 하나의 값을 가져오는 경우 등에서 사용한다.

 

  • subscribe - Mono
@Slf4j
public class MonoSimpleExample {

    @SneakyThrows
    public static void main(String[] args) {
        log.info("start main");

        getItems()
                .subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));

        log.info("end main");
        Thread.sleep(1000);
    }

    private static Mono<Integer> getItems(){
        return Mono.create(monoSink -> {
            monoSink.success(1);
        });
    }
}

 

일단 바로 사용해보도록 하자.

 

이렇게 하나의 값을 전달하고 종료되는 것을 볼 수 있다.

 

monoSink.success(2);

라는 코드를 추가해도 값이 전달되지 않고 그 전에 종료된다.

 

  • from - Mono

Flux를 Mono로 변환하는 방법이다.

첫 번째 값만 전달하게 된다.

 

from의 파라미터에 Flux를 넘겨주면 된다.

@Slf4j
public class FluxToMonoExample {
    public static void main(String[] args) {
        log.info("start main");
        Mono.from(getItems())
                .subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }

    private static Flux<Integer> getItems() {
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

 

1만 넘어가는 것을 볼 수 있다.

 

  • collectList - Mono

하나의 값만 전달하는 것이 아니라, 값들을 모두 모아 리스트로 전달하는 방법이다.

@Slf4j
public class FluxToListMonoExample {

    public static void main(String[] args) {
        log.info("start main");
        getItems()
                .collectList()
                .subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }

    private static Flux<Integer> getItems(){
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

이렇게 작성하면 수들을 모두 모아 리스트로 전달하게 된다.

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

Mutiny  (0) 2024.03.14
RxJava Reactor  (2) 2024.03.14
HotPublisher 구현  (0) 2024.03.13
ColdPublisher 구현  (0) 2024.03.13
Publisher, Subscriber에 대하여  (0) 2024.03.12
728x90

그 다음은 HotPublisher이다.

 

소비자와 관계 없이 데이터를 한 번만 통지하는 것이 HotPublisher이다.

해당하는 데이터는 한 번만 통지하고, 소비자는 구독한 시점 이후에 통지된 데이터들만 전달 받을 수 있다.

 

바로 작성해보도록 하자.

Subscriber는 변함 없이 ColdPublisher에서 사용했던 코드를 그대로 사용한다.

 

  • Publisher

Cold와는 지속적으로 데이터를 전달한다.

@Slf4j
public class SimpleHotPublisher implements Flow.Publisher<Integer>{

    private final ExecutorService publisherExecutor = Executors.newSingleThreadExecutor();
    private final Future<Void> task;
    private List<Integer> numbers = new ArrayList<>();
    private List<SimpleHotSubscription> subscriptions = new ArrayList<>();

    public SimpleHotPublisher(){
        numbers.add(1);
        task = publisherExecutor.submit(() -> {
            for(int i = 2; !Thread.interrupted(); i++){
                numbers.add(i);
                subscriptions.forEach(SimpleHotSubscription::wakeup);
                Thread.sleep(100);
            }
            return null;
        });
    }

    public void shutdown(){
        this.task.cancel(true);
        publisherExecutor.shutdown();
    }

    @Override
    public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
        var subscription = new SimpleHotSubscription(subscriber);
        subscriber.onSubscribe(subscription);
        subscriptions.add(subscription);
    }

    private class SimpleHotSubscription implements Flow.Subscription{
        private int offset;
        private int requiredOffset;
        private final Flow.Subscriber<? super Integer> subscriber;
        private final ExecutorService subscriptionExecutorService = Executors.newSingleThreadExecutor();

        public SimpleHotSubscription(Flow.Subscriber<? super Integer> subscriber){
            int lastElementIndex = numbers.size() - 1;
            this.offset = lastElementIndex;
            this.requiredOffset = lastElementIndex;
            this.subscriber = subscriber;
        }

        @Override
        public void request(long n) {
            requiredOffset += n;

            onNextWhilePossible();
        }

        @Override
        public void cancel() {
            this.subscriber.onComplete();
            if(subscriptions.contains(this)){
                subscriptions.remove(this);
            }
            subscriptionExecutorService.shutdown();
        }

        public void wakeup(){
            onNextWhilePossible();
        }

        private void onNextWhilePossible(){
            subscriptionExecutorService.submit(() -> {
                while (offset < requiredOffset && offset < numbers.size()){
                    var item = numbers.get(offset);
                    subscriber.onNext(item);
                    offset++;
                }
            });
        }

    }
}

 

해당 코드를 실행해보자.

 

 

이렇게 시점에 따라 전달받는 데이터가 다른 것을 볼 수 있다.

 

 

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

RxJava Reactor  (2) 2024.03.14
Project reactor  (0) 2024.03.14
ColdPublisher 구현  (0) 2024.03.13
Publisher, Subscriber에 대하여  (0) 2024.03.12
CompletableFuture를 사용한 성능튜닝  (0) 2024.03.06
728x90

Publisher 중에서 ColdPublisher를 구현해보자.

 

ColdPublisher에 대해 먼저 알아보자.

 

소비자가 구독 할 때마다 데이터를 처음부터 새로 통지하는 것을 ColdPublisher라고 한다.

소비자는 구독 시점과 상관 없이 통지된 데이터를 처음부터 전달 받게 된다.

 

모든 소비자가 같은 데이터를 전달받게 될 것이다.

 

간단하게 한 번 구현해보자.

 

  • Publisher, Subscription

우선 Publisher와 Subscription이다.

public class SimpleColdPublisher implements Flow.Publisher<Integer>{

    @Override
    public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
        var iterator = Collections.synchronizedList(
                IntStream.range(1, 10).boxed().collect(Collectors.toList())
        ).iterator();
        var subscription = new SimpleColdSubscription(iterator, subscriber);
        subscriber.onSubscribe(subscription);
    }

    public class SimpleColdSubscription implements Flow.Subscription{

        private final Iterator<Integer> iterator;
        private final Flow.Subscriber<? super Integer> subscriber;
        private final ExecutorService executor = Executors.newSingleThreadExecutor();

        public SimpleColdSubscription(Iterator<Integer> iterator, Flow.Subscriber<? super Integer> subscriber) {
            this.iterator = iterator;
            this.subscriber = subscriber;
        }

        @Override
        public void request(long n) {
            executor.submit(() -> {
               for(int i = 0; i < n; i++){
                   if(iterator.hasNext()){
                       var number = iterator.next();
                       iterator.remove();
                       subscriber.onNext(number);
                   }
                   else{
                       subscriber.onComplete();
                       executor.shutdown();
                       break;
                   }
               }
            });
        }

        @Override
        public void cancel() {
            subscriber.onComplete();
        }
    }
}

 

ColdPublisher답게 전달할 데이터의 배열을 넘겨주게 된다.

 

해당 iterator의 데이터를 전부 전달해주고 종료된다.

 

 

  • Subscriber

그 다음은 Subscriber이다.

subscription을 인자로 받는 onSubscribe가 있으며, onNext를 호출 할 때마다 request로 데이터를 하나씩 받아온다.

 

@Slf4j
public class SimpleNamedSubscriber<T> implements Flow.Subscriber<T> {

    private Flow.Subscription subscription;
    private final String name;

    public SimpleNamedSubscriber(String name){
        this.name = name;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1);
        log.info("onSubscribe");
    }

    @Override
    public void onNext(T item) {
        log.info("name: {}, onNext: {}", name, item);
        this.subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        log.error("onError: {}", throwable.getMessage());
    }

    @Override
    public void onComplete() {
        log.info("onComplete");
    }
    
    public void cancel(){
        log.info("cancel");
        this.subscription.cancel();
    }
}

 

 

해당 코드를 실행해보자.

public class SimpleColdPublisherMain {

    @SneakyThrows
    public static void main(String[] args) {
        //create Publisher
        var publisher = new SimpleColdPublisher();

        var subscriber1 = new SimpleNamedSubscriber<Integer>("subscriber1");
        publisher.subscribe(subscriber1);

        Thread.sleep(5000);

        var subscriber2 = new SimpleNamedSubscriber<Integer>("subscriber2");
        publisher.subscribe(subscriber2);
    }
}

 

다음과 같은 코드로 Publisher와 Subscriber를 테스트 해보았다.

 

 

이렇게 subscriber1, subscriber2가 다른 시간에 subscribe를 했음에도 같은 결과가 나오는 것을 볼 수 있다.

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

Project reactor  (0) 2024.03.14
HotPublisher 구현  (0) 2024.03.13
Publisher, Subscriber에 대하여  (0) 2024.03.12
CompletableFuture를 사용한 성능튜닝  (0) 2024.03.06
CompletableFuture 인터페이스  (1) 2024.03.06
728x90

이번에 리액티브 프로그래밍을 공부하다가, 이런 용어들이 나왔다.

중간에 흐름을 조절해 준다는 역할을 한다고 하는데, 이해가 잘 되지 않아 일단 정리해 보고 넘어가려 한다.

 

 

우선 이해하기 쉽게 그림이다.

 

  • Publisher

Publisher가 Subscriber를 Subscribe한다.

Publisher는 request를 받으면 데이터를 생성하여 보낸다.

  • Subscriber

Subscriber가 Subscription을 onSubscribe 한다.

Subscriber는 필요할 때 Subscribe의 request를 통해 Publisher에게 데이터를 요청한다.

Subscriber는 onNext로 데이터를 받는다.

  • Subscription

Subscription은 Subscriber에 의해 등록된다.

 

 

모든 요청이 성공적으로 완료된다면 onComplete를, 요청이 실패하면 onError를 호출하고 흐름을 종료한다.

 

 

당연히 Publisher는 여러 개의 Subscriber를 Subscribe 가능하다.

 

하나씩 살펴보자

 

Publisher

@FunctionalInterface
public static interface Publisher<T> {
    public void subscribe(Subscriber<? super T> subscriber);
}

 

하나의 메서드 밖에 없다.

subscribe로 Subscriber를 등록하면 된다.

 

Subscription

public static interface Subscription {
    public void request(long n);

    public void cancel();
}

 

 

  • requst

Subscriber가 데이터를 처리 가능 할 때 request를 호출한다.

파라미터 n은 Publisher에게 요청하는 데이터의 개수이다.

  • cancel

Publisher에게 데이터를 그만 보내라고 요청하는 메서드이다.

 

Subscriber

public static interface Subscriber<T> {
    public void onSubscribe(Subscription subscription);
    
    public void onNext(T item);

    public void onError(Throwable throwable);

    public void onComplete();
}

 

  • onSubscribe

Subscription을 파라미터로 받아 request를 호출한다.

Subscription의 request를 호출하는 것은 온전히 Subscriber의 결정이며, 호출되기 전까지는 어떤 데이터도 흐르지 않는다.

  • onNext

Publisher가 보낸 데이터이다.

  • onError

에러로 종료

  • onComplete

성공적으로 종료

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

HotPublisher 구현  (0) 2024.03.13
ColdPublisher 구현  (0) 2024.03.13
CompletableFuture를 사용한 성능튜닝  (0) 2024.03.06
CompletableFuture 인터페이스  (1) 2024.03.06
CompletionStage 인터페이스  (1) 2024.03.05
728x90

지금까지 배운 것을 바탕으로 성능을 개선해보자.

 

동기적으로 작성된 코드를 비동기적으로 변경하는 것이다.

현재 동기적으로 작성된 코드는 다음과 같다.

public Optional<User> getUserById(String id){
        return userRepository.findById(id)
                .map(user -> {
                    var image = imageRepository.findById(user.getProfileImageId())
                            .map(imageEntity -> {
                                return new Image(imageEntity.getId(), imageEntity.getName(), imageEntity.getUrl());
                            });

                    var articles = articleRepository.findAllByUserId(user.getId())
                            .stream().map(articleEntity ->
                                    new Article(articleEntity.getId(), articleEntity.getTitle(), articleEntity.getContent())).toList();

                    var followCount = followRepository.countByUserID(user.getId());

                    return new User(
                            user.getId(),
                            user.getName(),
                            user.getAge(),
                            image,
                            articles,
                            followCount
                    );
                });
    }

 

Repository에서 가져올 때마다 1초 정도 시간이 걸린다고 생각하여 1초 정도 Thread.sleep을 사용했고, 조회한 값들을 다른 객체에 넣어서 반환하는 메서드이다.

 

차례대로 1초씩 3번 호출하기 때문에 적어도 3초 이상의 시간이 소모될 것이다.

 

    void testGetUser(){
        //given
        String userId = "1234";

        //when
        Optional<User> optionalUser = userBlockingService.getUserById(userId);

        //then
        assertFalse(optionalUser.isEmpty());
        var user = optionalUser.get();
        assertEquals(user.getName(), "sk");

        assertFalse(user.getProfileImage().isEmpty());
        assertFalse(user.getProfileImage().isEmpty());
        var image = user.getProfileImage().get();
        assertEquals(image.getId(), "image#1000");
        assertEquals(image.getName(), "profileImage");
        assertEquals(image.getUrl(), "https://avatars.githubusercontent.com/u/98071131?s=400&u=9107a0b50b52da5bbc8528157eed1cca34feb3c5&v=4");

        assertEquals(2, user.getArticleList().size());

        assertEquals(1000, user.getFollowCount());
    }

 

해당 테스트 코드의 시간을 확인 했을 때 4sec 74ms가 소모되었다.

 

이제 변경해보도록 하자.

일단 각 Repository를 CompletableFuture을 반환하도록 메서드를 변경했다.

    @SneakyThrows
    public CompletableFuture<Optional<UserEntity>> findById(String userId){
        return CompletableFuture.supplyAsync(() -> {
            log.info("UserRepository.findById: {}", userId);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            var user = userMap.get(userId);
            return Optional.ofNullable(user);
        });
    }

 

 

현재 에러만 잡아서 변경해놓은 코드는 다음과 같다.

    @SneakyThrows
    public Optional<User> getUserById(String id){
        return userRepository.findById(id).get()
                .map(this::getUser);
    }

    @SneakyThrows
    private User getUser(UserEntity user){
        var image = imageRepository.findById(user.getProfileImageId()).get()
                .map(imageEntity -> {
                    return new Image(imageEntity.getId(), imageEntity.getName(), imageEntity.getUrl());
                });

        var articles = articleRepository.findAllByUserId(user.getId()).get()
                .stream().map(articleEntity ->
                        new Article(articleEntity.getId(), articleEntity.getTitle(), articleEntity.getContent())).toList();

        var followCount = followRepository.countByUserID(user.getId()).get();

        return new User(
                user.getId(),
                user.getName(),
                user.getAge(),
                image,
                articles,
                followCount
        );
    }

 

이거를 Repository에 접근 할 때마다 CompletableFuture를 사용했다.

 

    @SneakyThrows
    public CompletableFuture<Optional<User>> getUserById(String id){
        return userRepository.findById(id)
                .thenCompose(this::getUser);
    }

    @SneakyThrows
    private CompletableFuture<Optional<User>> getUser(Optional<UserEntity> userEntityOptional) {
        if (userEntityOptional.isEmpty()) {
            return CompletableFuture.completedFuture(Optional.empty());
        }

        var userEntity = userEntityOptional.get();

        var imageFuture = imageRepository.findById(userEntity.getProfileImageId())
                .thenApplyAsync(imageEntityOptional ->
                        imageEntityOptional.map(imageEntity ->
                                new Image(imageEntity.getId(), imageEntity.getName(), imageEntity.getUrl())
                        )
                );


        var articlesFuture = articleRepository.findAllByUserId(userEntity.getId())
                .thenApplyAsync(articleEntities ->
                        articleEntities.stream()
                                .map(articleEntity ->
                                        new Article(articleEntity.getId(), articleEntity.getTitle(), articleEntity.getContent())
                                )
                                .collect(Collectors.toList())
                );

        var followCountFuture = followRepository.countByUserID(userEntity.getId());

        return CompletableFuture.allOf(imageFuture, articlesFuture, followCountFuture)
                .thenAcceptAsync(v -> {
                    log.info("Three futures are completed");
                })
                .thenRunAsync(() -> {
                    log.info("Three futures are also completed");
                })
                .thenApplyAsync(v -> {
                    try {
                        var image = imageFuture.get();
                        var articles = articlesFuture.get();
                        var followCount = followCountFuture.get();

                        return Optional.of(
                                new User(
                                        userEntity.getId(),
                                        userEntity.getName(),
                                        userEntity.getAge(),
                                        image,
                                        articles,
                                        followCount
                                )
                        );
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
    }
}

 

이렇게 변경하고 전에 사용했던 테스트 코드의 시간을 측정해보았다.

 

거의 반으로 줄어든 2sec 66ms가 나왔다.

 

전 코드에서 Repository에 접근 하는 것을 기다리기 보다 비동기적으로 실행한다면 시간을 크게 줄일 수 있는 것을 볼 수 있었던 것 같다.

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

ColdPublisher 구현  (0) 2024.03.13
Publisher, Subscriber에 대하여  (0) 2024.03.12
CompletableFuture 인터페이스  (1) 2024.03.06
CompletionStage 인터페이스  (1) 2024.03.05
Future 인터페이스  (1) 2024.01.09

+ Recent posts