728x90

Pivotal에서 개발한 Project reactor에 대하여 알아보자.

 

우선 Mono와 Flux를 implements 했다.

 

우선 아래는 이번에 사용할 Subscriber 코드이다.

@Slf4j
@RequiredArgsConstructor
public class SimpleSubscriber <T> implements Subscriber<T> {

    private final Integer count;

    @Override
    public void onSubscribe(Subscription s) {
        log.info("subscribe");
        s.request(count);
        log.info("request: {}", count);
    }

    @SneakyThrows
    @Override
    public void onNext(T t) {
        log.info("item: {}", t);
        Thread.sleep(100);
    }

    @Override
    public void onError(Throwable t) {
        log.error("error: {}", t.getMessage());
    }

    @Override
    public void onComplete() {
        log.info("complete");
    }
}

 

reactivestreams의 Subscriber이며, 저번에 구현했던 Subscriber와 굉장히 유사한 것을 볼 수 있다.

 

Flux

Flux는 0~n 개의 item을 전달한다.

만약 에러가 발생하면 error signal을 전달하고 종료하며, 모든 item을 전달했다면 complete signal을 전달하고 종료한다.

backPressure를 지원한다.

 

  • subscribe - Flux

우선 바로 사용해보자.

@Slf4j
public class FluxSimpleExample {

    @SneakyThrows
    public static void main(String[] args) {
        log.info("start main");
        getItems()
                .subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");

        Thread.sleep(1000);
    }

    public static Flux<Integer> getItems(){
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

그냥 간단하게 하나씩 넘겨주는 것이고 이것에 대한 결과는 아래와 같다.

 

모두 main 쓰레드에서 실행되는 것을 볼 수 있다.

 

  • subscribeOn - Flux

다른 쓰레드에서 실행시키고 싶다면 아래의 subscribeOn을 사용해야 한다.

@Slf4j
public class FluxSimpleSubscribeOnExample {

    @SneakyThrows
    public static void main(String[] args) {
        log.info("start main");
        getItems()
                .map(i -> {
                    log.info("map {}", i);
                    return i;
                })
                .subscribeOn(Schedulers.single())
                .subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));

        log.info("end main");

        Thread.sleep(1000);
    }

    private static Flux<Integer> getItems(){
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

이렇게 subscribeOn으로 쓰레드를 지정해주면, 해당 쓰레드에서 실행이 된다.

 

 

  • backPressure - Flux

 

BackPressure를 수행하는 코드를 작성해보자.

지금 Subscriber는 요청을 한 번만 하기 때문에 Subscriber를 다시 작성해야 한다.

 

다시 작성한 Subscribe이다.

onNext를 호출 당할 때마다, request로 다시 호출하여 계속 반복이 된다.

@Slf4j
public class ContinuousRequestSubscriber<T> implements Subscriber<T> {

    private final Integer count = 1;
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription s) {
        this.subscription = s;
        log.info("subscribe");
        s.request(count);
        log.info("request: {}", count);
    }

    @SneakyThrows
    @Override
    public void onNext(T t) {
        log.info("item: {}", t);

        Thread.sleep(1000);
        subscription.request(1);
        log.info("request: {}", count);
    }

    @Override
    public void onError(Throwable t) {
        log.error("error: {}", t.getMessage());
    }

    @Override
    public void onComplete() {
        log.info("complete");
    }
}

 

이렇게 작성된 Subscriber를 

@Slf4j
public class FluxContinuousRequestSubscriberExample {
    public static void main(String[] args) {
        getItems().subscribe(new ContinuousRequestSubscriber<>());
    }

    private static Flux<Integer> getItems() {
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

이 코드로 실행해본다면 다음과 같이 나오는 것을 볼 수 있다.

이렇게 Request 한 만큼만 데이터를 주어, 조절하는 것을 볼 수 있다.

 

  • error - Flux

이번에는 에러에 대한 처리이다.

Flux는 에러를 만나면 더 이상 진행하지 않는다.

@Slf4j
public class FluxErrorExample {

    public static void main(String[] args) {
        log.info("start main");
        getItems().subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }

    private static Flux<Integer> getItems(){
        return Flux.create(fluxsink -> {
            fluxsink.next(0);
            fluxsink.next(1);
            var error = new RuntimeException("error in flux");
            fluxsink.error(error);
            fluxsink.next(2);
        });
    }
}

 

이렇게 0과 1을 주고 다음으로 에러를 넘기면 다음과 같이 나온다.

 

0과 1만 나오고 에러를 반환한 후 종료되는 것을 볼 수 있다.

 

  • complete - Flux

complete하는 방법으로 complete를 넘기면 종료된다.

@Slf4j
public class FluxCompleteExample {

    public static void main(String[] args) {
        log.info("start main");
        getItems().subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }

    private static Flux<Integer> getItems(){
        return Flux.create(fluxSink -> {
            fluxSink.complete();
        });
    }
}

 

값을 넘기지 않아도 바로 종료되는 것을 볼 수 있다.

 

Mono

0..1개의 item을 전달한다.

에러가 발생하면 error signal을 전달하고 종료되며, 모든 item을 전달하면 complete signal을 전달하고 종료된다.

 

하나의 item만 전달하기 때문에 next 실행 후 바로 complete가 보장된다.

혹은 전달하지 않고 complete를 하면 값이 없다는 것을 의미한다.

이러한 이유로 Flux를 사용하지 않고 Mono를 사용하는 경우가 있다.

데이터베이스에서 하나의 값을 가져오는 경우 등에서 사용한다.

 

  • subscribe - Mono
@Slf4j
public class MonoSimpleExample {

    @SneakyThrows
    public static void main(String[] args) {
        log.info("start main");

        getItems()
                .subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));

        log.info("end main");
        Thread.sleep(1000);
    }

    private static Mono<Integer> getItems(){
        return Mono.create(monoSink -> {
            monoSink.success(1);
        });
    }
}

 

일단 바로 사용해보도록 하자.

 

이렇게 하나의 값을 전달하고 종료되는 것을 볼 수 있다.

 

monoSink.success(2);

라는 코드를 추가해도 값이 전달되지 않고 그 전에 종료된다.

 

  • from - Mono

Flux를 Mono로 변환하는 방법이다.

첫 번째 값만 전달하게 된다.

 

from의 파라미터에 Flux를 넘겨주면 된다.

@Slf4j
public class FluxToMonoExample {
    public static void main(String[] args) {
        log.info("start main");
        Mono.from(getItems())
                .subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }

    private static Flux<Integer> getItems() {
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

 

1만 넘어가는 것을 볼 수 있다.

 

  • collectList - Mono

하나의 값만 전달하는 것이 아니라, 값들을 모두 모아 리스트로 전달하는 방법이다.

@Slf4j
public class FluxToListMonoExample {

    public static void main(String[] args) {
        log.info("start main");
        getItems()
                .collectList()
                .subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }

    private static Flux<Integer> getItems(){
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

이렇게 작성하면 수들을 모두 모아 리스트로 전달하게 된다.

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

Mutiny  (0) 2024.03.14
RxJava Reactor  (2) 2024.03.14
HotPublisher 구현  (0) 2024.03.13
ColdPublisher 구현  (0) 2024.03.13
Publisher, Subscriber에 대하여  (0) 2024.03.12

+ Recent posts