728x90
Publisher 중에서 ColdPublisher를 구현해보자.
ColdPublisher에 대해 먼저 알아보자.
소비자가 구독 할 때마다 데이터를 처음부터 새로 통지하는 것을 ColdPublisher라고 한다.
소비자는 구독 시점과 상관 없이 통지된 데이터를 처음부터 전달 받게 된다.
모든 소비자가 같은 데이터를 전달받게 될 것이다.
간단하게 한 번 구현해보자.
- Publisher, Subscription
우선 Publisher와 Subscription이다.
public class SimpleColdPublisher implements Flow.Publisher<Integer>{
@Override
public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
var iterator = Collections.synchronizedList(
IntStream.range(1, 10).boxed().collect(Collectors.toList())
).iterator();
var subscription = new SimpleColdSubscription(iterator, subscriber);
subscriber.onSubscribe(subscription);
}
public class SimpleColdSubscription implements Flow.Subscription{
private final Iterator<Integer> iterator;
private final Flow.Subscriber<? super Integer> subscriber;
private final ExecutorService executor = Executors.newSingleThreadExecutor();
public SimpleColdSubscription(Iterator<Integer> iterator, Flow.Subscriber<? super Integer> subscriber) {
this.iterator = iterator;
this.subscriber = subscriber;
}
@Override
public void request(long n) {
executor.submit(() -> {
for(int i = 0; i < n; i++){
if(iterator.hasNext()){
var number = iterator.next();
iterator.remove();
subscriber.onNext(number);
}
else{
subscriber.onComplete();
executor.shutdown();
break;
}
}
});
}
@Override
public void cancel() {
subscriber.onComplete();
}
}
}
ColdPublisher답게 전달할 데이터의 배열을 넘겨주게 된다.
해당 iterator의 데이터를 전부 전달해주고 종료된다.
- Subscriber
그 다음은 Subscriber이다.
subscription을 인자로 받는 onSubscribe가 있으며, onNext를 호출 할 때마다 request로 데이터를 하나씩 받아온다.
@Slf4j
public class SimpleNamedSubscriber<T> implements Flow.Subscriber<T> {
private Flow.Subscription subscription;
private final String name;
public SimpleNamedSubscriber(String name){
this.name = name;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
log.info("onSubscribe");
}
@Override
public void onNext(T item) {
log.info("name: {}, onNext: {}", name, item);
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
log.error("onError: {}", throwable.getMessage());
}
@Override
public void onComplete() {
log.info("onComplete");
}
public void cancel(){
log.info("cancel");
this.subscription.cancel();
}
}
해당 코드를 실행해보자.
public class SimpleColdPublisherMain {
@SneakyThrows
public static void main(String[] args) {
//create Publisher
var publisher = new SimpleColdPublisher();
var subscriber1 = new SimpleNamedSubscriber<Integer>("subscriber1");
publisher.subscribe(subscriber1);
Thread.sleep(5000);
var subscriber2 = new SimpleNamedSubscriber<Integer>("subscriber2");
publisher.subscribe(subscriber2);
}
}
다음과 같은 코드로 Publisher와 Subscriber를 테스트 해보았다.
이렇게 subscriber1, subscriber2가 다른 시간에 subscribe를 했음에도 같은 결과가 나오는 것을 볼 수 있다.
'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글
Project reactor (0) | 2024.03.14 |
---|---|
HotPublisher 구현 (0) | 2024.03.13 |
Publisher, Subscriber에 대하여 (0) | 2024.03.12 |
CompletableFuture를 사용한 성능튜닝 (0) | 2024.03.06 |
CompletableFuture 인터페이스 (1) | 2024.03.06 |