728x90

Multi

0..n개의 item을 전달

에러가 발생하면 error signal을 전달하고, 모든 item을 전달하면 complete signal을 전달하고 종료한다.

backPressure를 지원하며, Reactor의 Flux와 유사하다.

 

Multi의 Subscriber이다.

다른 Subscriber들과 메서드 이름이 좀 다르긴 하지만, 기능은 생각하는 그대로이다.

@Slf4j
@RequiredArgsConstructor
public class SimpleMultiSubscriber<T> implements MultiSubscriber<T> {

    private final Integer count;

    @Override
    public void onItem(T item) {
        log.info("item: {}", item);
    }

    @Override
    public void onFailure(Throwable failure) {
        log.error("fail: {}", failure.getMessage());
    }

    @Override
    public void onCompletion() {
        log.info("completion");
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        subscription.request(count);
        log.info("subscribe");
    }
}

 

예제로 실행을 해보자.

@Slf4j
public class MultiExample {

    public static void main(String[] args) {
        getItems()
                .subscribe()
                .withSubscriber(
                        new SimpleMultiSubscriber<>(Integer.MAX_VALUE)
                );
    }

    private static Multi<Integer> getItems(){
        return Multi.createFrom().items(1, 2, 3, 4, 5);
    }
}

 

subscribe에 Subscriber를 넣는 것이 아니라, withSubscriber에 넘겨야 한다.

 

Uni

0..1개의 item을 전달

에러가 발생하면 error signal 전달하고 종료, 모든 item을 전달했다면 complete signal을 전달하고 종료한다.

Reactor의 Mono와 유사하다.

 

@Slf4j
@RequiredArgsConstructor
public class SimpleUniSubscriber<T> implements UniSubscriber<T> {

    private final Integer count;
    private UniSubscription subscription;

    @Override
    public void onSubscribe(UniSubscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
        log.info("subscribe");
    }

    @Override
    public void onItem(T item) {
        log.info("item: {}", item);
    }

    @Override
    public void onFailure(Throwable failure) {
        log.error("error: {}", failure.getMessage());
    }
}

 

하나면 보내면 complete이기 때문에 따로 complete 메서드는 없다.

 

아래의 코드로 실행을 해보자.

@Slf4j
public class UniExample {

    public static void main(String[] args) {
        getItem()
                .subscribe()
                .withSubscriber(new SimpleUniSubscriber<>(Integer.MAX_VALUE));
    }

    private static Uni<Integer> getItem(){
        return Uni.createFrom().item(1);
    }
}

 

그럼 하나의 값이 나오게 될 것이다.

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

Java NIO  (0) 2024.03.18
JAVA IO  (0) 2024.03.18
RxJava Reactor  (2) 2024.03.14
Project reactor  (0) 2024.03.14
HotPublisher 구현  (0) 2024.03.13
728x90

이번에 리액티브 프로그래밍을 공부하다가, 이런 용어들이 나왔다.

중간에 흐름을 조절해 준다는 역할을 한다고 하는데, 이해가 잘 되지 않아 일단 정리해 보고 넘어가려 한다.

 

 

우선 이해하기 쉽게 그림이다.

 

  • Publisher

Publisher가 Subscriber를 Subscribe한다.

Publisher는 request를 받으면 데이터를 생성하여 보낸다.

  • Subscriber

Subscriber가 Subscription을 onSubscribe 한다.

Subscriber는 필요할 때 Subscribe의 request를 통해 Publisher에게 데이터를 요청한다.

Subscriber는 onNext로 데이터를 받는다.

  • Subscription

Subscription은 Subscriber에 의해 등록된다.

 

 

모든 요청이 성공적으로 완료된다면 onComplete를, 요청이 실패하면 onError를 호출하고 흐름을 종료한다.

 

 

당연히 Publisher는 여러 개의 Subscriber를 Subscribe 가능하다.

 

하나씩 살펴보자

 

Publisher

@FunctionalInterface
public static interface Publisher<T> {
    public void subscribe(Subscriber<? super T> subscriber);
}

 

하나의 메서드 밖에 없다.

subscribe로 Subscriber를 등록하면 된다.

 

Subscription

public static interface Subscription {
    public void request(long n);

    public void cancel();
}

 

 

  • requst

Subscriber가 데이터를 처리 가능 할 때 request를 호출한다.

파라미터 n은 Publisher에게 요청하는 데이터의 개수이다.

  • cancel

Publisher에게 데이터를 그만 보내라고 요청하는 메서드이다.

 

Subscriber

public static interface Subscriber<T> {
    public void onSubscribe(Subscription subscription);
    
    public void onNext(T item);

    public void onError(Throwable throwable);

    public void onComplete();
}

 

  • onSubscribe

Subscription을 파라미터로 받아 request를 호출한다.

Subscription의 request를 호출하는 것은 온전히 Subscriber의 결정이며, 호출되기 전까지는 어떤 데이터도 흐르지 않는다.

  • onNext

Publisher가 보낸 데이터이다.

  • onError

에러로 종료

  • onComplete

성공적으로 종료

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

HotPublisher 구현  (0) 2024.03.13
ColdPublisher 구현  (0) 2024.03.13
CompletableFuture를 사용한 성능튜닝  (0) 2024.03.06
CompletableFuture 인터페이스  (1) 2024.03.06
CompletionStage 인터페이스  (1) 2024.03.05

+ Recent posts