728x90

Publisher 중에서 ColdPublisher를 구현해보자.

 

ColdPublisher에 대해 먼저 알아보자.

 

소비자가 구독 할 때마다 데이터를 처음부터 새로 통지하는 것을 ColdPublisher라고 한다.

소비자는 구독 시점과 상관 없이 통지된 데이터를 처음부터 전달 받게 된다.

 

모든 소비자가 같은 데이터를 전달받게 될 것이다.

 

간단하게 한 번 구현해보자.

 

  • Publisher, Subscription

우선 Publisher와 Subscription이다.

public class SimpleColdPublisher implements Flow.Publisher<Integer>{

    @Override
    public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
        var iterator = Collections.synchronizedList(
                IntStream.range(1, 10).boxed().collect(Collectors.toList())
        ).iterator();
        var subscription = new SimpleColdSubscription(iterator, subscriber);
        subscriber.onSubscribe(subscription);
    }

    public class SimpleColdSubscription implements Flow.Subscription{

        private final Iterator<Integer> iterator;
        private final Flow.Subscriber<? super Integer> subscriber;
        private final ExecutorService executor = Executors.newSingleThreadExecutor();

        public SimpleColdSubscription(Iterator<Integer> iterator, Flow.Subscriber<? super Integer> subscriber) {
            this.iterator = iterator;
            this.subscriber = subscriber;
        }

        @Override
        public void request(long n) {
            executor.submit(() -> {
               for(int i = 0; i < n; i++){
                   if(iterator.hasNext()){
                       var number = iterator.next();
                       iterator.remove();
                       subscriber.onNext(number);
                   }
                   else{
                       subscriber.onComplete();
                       executor.shutdown();
                       break;
                   }
               }
            });
        }

        @Override
        public void cancel() {
            subscriber.onComplete();
        }
    }
}

 

ColdPublisher답게 전달할 데이터의 배열을 넘겨주게 된다.

 

해당 iterator의 데이터를 전부 전달해주고 종료된다.

 

 

  • Subscriber

그 다음은 Subscriber이다.

subscription을 인자로 받는 onSubscribe가 있으며, onNext를 호출 할 때마다 request로 데이터를 하나씩 받아온다.

 

@Slf4j
public class SimpleNamedSubscriber<T> implements Flow.Subscriber<T> {

    private Flow.Subscription subscription;
    private final String name;

    public SimpleNamedSubscriber(String name){
        this.name = name;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1);
        log.info("onSubscribe");
    }

    @Override
    public void onNext(T item) {
        log.info("name: {}, onNext: {}", name, item);
        this.subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        log.error("onError: {}", throwable.getMessage());
    }

    @Override
    public void onComplete() {
        log.info("onComplete");
    }
    
    public void cancel(){
        log.info("cancel");
        this.subscription.cancel();
    }
}

 

 

해당 코드를 실행해보자.

public class SimpleColdPublisherMain {

    @SneakyThrows
    public static void main(String[] args) {
        //create Publisher
        var publisher = new SimpleColdPublisher();

        var subscriber1 = new SimpleNamedSubscriber<Integer>("subscriber1");
        publisher.subscribe(subscriber1);

        Thread.sleep(5000);

        var subscriber2 = new SimpleNamedSubscriber<Integer>("subscriber2");
        publisher.subscribe(subscriber2);
    }
}

 

다음과 같은 코드로 Publisher와 Subscriber를 테스트 해보았다.

 

 

이렇게 subscriber1, subscriber2가 다른 시간에 subscribe를 했음에도 같은 결과가 나오는 것을 볼 수 있다.

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

Project reactor  (0) 2024.03.14
HotPublisher 구현  (0) 2024.03.13
Publisher, Subscriber에 대하여  (0) 2024.03.12
CompletableFuture를 사용한 성능튜닝  (0) 2024.03.06
CompletableFuture 인터페이스  (1) 2024.03.06

+ Recent posts