728x90

Pivotal에서 개발한 Project reactor에 대하여 알아보자.

 

우선 Mono와 Flux를 implements 했다.

 

우선 아래는 이번에 사용할 Subscriber 코드이다.

@Slf4j
@RequiredArgsConstructor
public class SimpleSubscriber <T> implements Subscriber<T> {

    private final Integer count;

    @Override
    public void onSubscribe(Subscription s) {
        log.info("subscribe");
        s.request(count);
        log.info("request: {}", count);
    }

    @SneakyThrows
    @Override
    public void onNext(T t) {
        log.info("item: {}", t);
        Thread.sleep(100);
    }

    @Override
    public void onError(Throwable t) {
        log.error("error: {}", t.getMessage());
    }

    @Override
    public void onComplete() {
        log.info("complete");
    }
}

 

reactivestreams의 Subscriber이며, 저번에 구현했던 Subscriber와 굉장히 유사한 것을 볼 수 있다.

 

Flux

Flux는 0~n 개의 item을 전달한다.

만약 에러가 발생하면 error signal을 전달하고 종료하며, 모든 item을 전달했다면 complete signal을 전달하고 종료한다.

backPressure를 지원한다.

 

  • subscribe - Flux

우선 바로 사용해보자.

@Slf4j
public class FluxSimpleExample {

    @SneakyThrows
    public static void main(String[] args) {
        log.info("start main");
        getItems()
                .subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");

        Thread.sleep(1000);
    }

    public static Flux<Integer> getItems(){
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

그냥 간단하게 하나씩 넘겨주는 것이고 이것에 대한 결과는 아래와 같다.

 

모두 main 쓰레드에서 실행되는 것을 볼 수 있다.

 

  • subscribeOn - Flux

다른 쓰레드에서 실행시키고 싶다면 아래의 subscribeOn을 사용해야 한다.

@Slf4j
public class FluxSimpleSubscribeOnExample {

    @SneakyThrows
    public static void main(String[] args) {
        log.info("start main");
        getItems()
                .map(i -> {
                    log.info("map {}", i);
                    return i;
                })
                .subscribeOn(Schedulers.single())
                .subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));

        log.info("end main");

        Thread.sleep(1000);
    }

    private static Flux<Integer> getItems(){
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

이렇게 subscribeOn으로 쓰레드를 지정해주면, 해당 쓰레드에서 실행이 된다.

 

 

  • backPressure - Flux

 

BackPressure를 수행하는 코드를 작성해보자.

지금 Subscriber는 요청을 한 번만 하기 때문에 Subscriber를 다시 작성해야 한다.

 

다시 작성한 Subscribe이다.

onNext를 호출 당할 때마다, request로 다시 호출하여 계속 반복이 된다.

@Slf4j
public class ContinuousRequestSubscriber<T> implements Subscriber<T> {

    private final Integer count = 1;
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription s) {
        this.subscription = s;
        log.info("subscribe");
        s.request(count);
        log.info("request: {}", count);
    }

    @SneakyThrows
    @Override
    public void onNext(T t) {
        log.info("item: {}", t);

        Thread.sleep(1000);
        subscription.request(1);
        log.info("request: {}", count);
    }

    @Override
    public void onError(Throwable t) {
        log.error("error: {}", t.getMessage());
    }

    @Override
    public void onComplete() {
        log.info("complete");
    }
}

 

이렇게 작성된 Subscriber를 

@Slf4j
public class FluxContinuousRequestSubscriberExample {
    public static void main(String[] args) {
        getItems().subscribe(new ContinuousRequestSubscriber<>());
    }

    private static Flux<Integer> getItems() {
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

이 코드로 실행해본다면 다음과 같이 나오는 것을 볼 수 있다.

이렇게 Request 한 만큼만 데이터를 주어, 조절하는 것을 볼 수 있다.

 

  • error - Flux

이번에는 에러에 대한 처리이다.

Flux는 에러를 만나면 더 이상 진행하지 않는다.

@Slf4j
public class FluxErrorExample {

    public static void main(String[] args) {
        log.info("start main");
        getItems().subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }

    private static Flux<Integer> getItems(){
        return Flux.create(fluxsink -> {
            fluxsink.next(0);
            fluxsink.next(1);
            var error = new RuntimeException("error in flux");
            fluxsink.error(error);
            fluxsink.next(2);
        });
    }
}

 

이렇게 0과 1을 주고 다음으로 에러를 넘기면 다음과 같이 나온다.

 

0과 1만 나오고 에러를 반환한 후 종료되는 것을 볼 수 있다.

 

  • complete - Flux

complete하는 방법으로 complete를 넘기면 종료된다.

@Slf4j
public class FluxCompleteExample {

    public static void main(String[] args) {
        log.info("start main");
        getItems().subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }

    private static Flux<Integer> getItems(){
        return Flux.create(fluxSink -> {
            fluxSink.complete();
        });
    }
}

 

값을 넘기지 않아도 바로 종료되는 것을 볼 수 있다.

 

Mono

0..1개의 item을 전달한다.

에러가 발생하면 error signal을 전달하고 종료되며, 모든 item을 전달하면 complete signal을 전달하고 종료된다.

 

하나의 item만 전달하기 때문에 next 실행 후 바로 complete가 보장된다.

혹은 전달하지 않고 complete를 하면 값이 없다는 것을 의미한다.

이러한 이유로 Flux를 사용하지 않고 Mono를 사용하는 경우가 있다.

데이터베이스에서 하나의 값을 가져오는 경우 등에서 사용한다.

 

  • subscribe - Mono
@Slf4j
public class MonoSimpleExample {

    @SneakyThrows
    public static void main(String[] args) {
        log.info("start main");

        getItems()
                .subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));

        log.info("end main");
        Thread.sleep(1000);
    }

    private static Mono<Integer> getItems(){
        return Mono.create(monoSink -> {
            monoSink.success(1);
        });
    }
}

 

일단 바로 사용해보도록 하자.

 

이렇게 하나의 값을 전달하고 종료되는 것을 볼 수 있다.

 

monoSink.success(2);

라는 코드를 추가해도 값이 전달되지 않고 그 전에 종료된다.

 

  • from - Mono

Flux를 Mono로 변환하는 방법이다.

첫 번째 값만 전달하게 된다.

 

from의 파라미터에 Flux를 넘겨주면 된다.

@Slf4j
public class FluxToMonoExample {
    public static void main(String[] args) {
        log.info("start main");
        Mono.from(getItems())
                .subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }

    private static Flux<Integer> getItems() {
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

 

1만 넘어가는 것을 볼 수 있다.

 

  • collectList - Mono

하나의 값만 전달하는 것이 아니라, 값들을 모두 모아 리스트로 전달하는 방법이다.

@Slf4j
public class FluxToListMonoExample {

    public static void main(String[] args) {
        log.info("start main");
        getItems()
                .collectList()
                .subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }

    private static Flux<Integer> getItems(){
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}

 

이렇게 작성하면 수들을 모두 모아 리스트로 전달하게 된다.

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

Mutiny  (0) 2024.03.14
RxJava Reactor  (2) 2024.03.14
HotPublisher 구현  (0) 2024.03.13
ColdPublisher 구현  (0) 2024.03.13
Publisher, Subscriber에 대하여  (0) 2024.03.12
728x90

이번에 리액티브 프로그래밍을 공부하다가, 이런 용어들이 나왔다.

중간에 흐름을 조절해 준다는 역할을 한다고 하는데, 이해가 잘 되지 않아 일단 정리해 보고 넘어가려 한다.

 

 

우선 이해하기 쉽게 그림이다.

 

  • Publisher

Publisher가 Subscriber를 Subscribe한다.

Publisher는 request를 받으면 데이터를 생성하여 보낸다.

  • Subscriber

Subscriber가 Subscription을 onSubscribe 한다.

Subscriber는 필요할 때 Subscribe의 request를 통해 Publisher에게 데이터를 요청한다.

Subscriber는 onNext로 데이터를 받는다.

  • Subscription

Subscription은 Subscriber에 의해 등록된다.

 

 

모든 요청이 성공적으로 완료된다면 onComplete를, 요청이 실패하면 onError를 호출하고 흐름을 종료한다.

 

 

당연히 Publisher는 여러 개의 Subscriber를 Subscribe 가능하다.

 

하나씩 살펴보자

 

Publisher

@FunctionalInterface
public static interface Publisher<T> {
    public void subscribe(Subscriber<? super T> subscriber);
}

 

하나의 메서드 밖에 없다.

subscribe로 Subscriber를 등록하면 된다.

 

Subscription

public static interface Subscription {
    public void request(long n);

    public void cancel();
}

 

 

  • requst

Subscriber가 데이터를 처리 가능 할 때 request를 호출한다.

파라미터 n은 Publisher에게 요청하는 데이터의 개수이다.

  • cancel

Publisher에게 데이터를 그만 보내라고 요청하는 메서드이다.

 

Subscriber

public static interface Subscriber<T> {
    public void onSubscribe(Subscription subscription);
    
    public void onNext(T item);

    public void onError(Throwable throwable);

    public void onComplete();
}

 

  • onSubscribe

Subscription을 파라미터로 받아 request를 호출한다.

Subscription의 request를 호출하는 것은 온전히 Subscriber의 결정이며, 호출되기 전까지는 어떤 데이터도 흐르지 않는다.

  • onNext

Publisher가 보낸 데이터이다.

  • onError

에러로 종료

  • onComplete

성공적으로 종료

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

HotPublisher 구현  (0) 2024.03.13
ColdPublisher 구현  (0) 2024.03.13
CompletableFuture를 사용한 성능튜닝  (0) 2024.03.06
CompletableFuture 인터페이스  (1) 2024.03.06
CompletionStage 인터페이스  (1) 2024.03.05

+ Recent posts