728x90

Multi

0..n개의 item을 전달

에러가 발생하면 error signal을 전달하고, 모든 item을 전달하면 complete signal을 전달하고 종료한다.

backPressure를 지원하며, Reactor의 Flux와 유사하다.

 

Multi의 Subscriber이다.

다른 Subscriber들과 메서드 이름이 좀 다르긴 하지만, 기능은 생각하는 그대로이다.

@Slf4j
@RequiredArgsConstructor
public class SimpleMultiSubscriber<T> implements MultiSubscriber<T> {

    private final Integer count;

    @Override
    public void onItem(T item) {
        log.info("item: {}", item);
    }

    @Override
    public void onFailure(Throwable failure) {
        log.error("fail: {}", failure.getMessage());
    }

    @Override
    public void onCompletion() {
        log.info("completion");
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        subscription.request(count);
        log.info("subscribe");
    }
}

 

예제로 실행을 해보자.

@Slf4j
public class MultiExample {

    public static void main(String[] args) {
        getItems()
                .subscribe()
                .withSubscriber(
                        new SimpleMultiSubscriber<>(Integer.MAX_VALUE)
                );
    }

    private static Multi<Integer> getItems(){
        return Multi.createFrom().items(1, 2, 3, 4, 5);
    }
}

 

subscribe에 Subscriber를 넣는 것이 아니라, withSubscriber에 넘겨야 한다.

 

Uni

0..1개의 item을 전달

에러가 발생하면 error signal 전달하고 종료, 모든 item을 전달했다면 complete signal을 전달하고 종료한다.

Reactor의 Mono와 유사하다.

 

@Slf4j
@RequiredArgsConstructor
public class SimpleUniSubscriber<T> implements UniSubscriber<T> {

    private final Integer count;
    private UniSubscription subscription;

    @Override
    public void onSubscribe(UniSubscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
        log.info("subscribe");
    }

    @Override
    public void onItem(T item) {
        log.info("item: {}", item);
    }

    @Override
    public void onFailure(Throwable failure) {
        log.error("error: {}", failure.getMessage());
    }
}

 

하나면 보내면 complete이기 때문에 따로 complete 메서드는 없다.

 

아래의 코드로 실행을 해보자.

@Slf4j
public class UniExample {

    public static void main(String[] args) {
        getItem()
                .subscribe()
                .withSubscriber(new SimpleUniSubscriber<>(Integer.MAX_VALUE));
    }

    private static Uni<Integer> getItem(){
        return Uni.createFrom().item(1);
    }
}

 

그럼 하나의 값이 나오게 될 것이다.

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

Java NIO  (0) 2024.03.18
JAVA IO  (0) 2024.03.18
RxJava Reactor  (2) 2024.03.14
Project reactor  (0) 2024.03.14
HotPublisher 구현  (0) 2024.03.13

+ Recent posts