Pivotal에서 개발한 Project reactor에 대하여 알아보자.
우선 Mono와 Flux를 implements 했다.
우선 아래는 이번에 사용할 Subscriber 코드이다.
@Slf4j
@RequiredArgsConstructor
public class SimpleSubscriber <T> implements Subscriber<T> {
private final Integer count;
@Override
public void onSubscribe(Subscription s) {
log.info("subscribe");
s.request(count);
log.info("request: {}", count);
}
@SneakyThrows
@Override
public void onNext(T t) {
log.info("item: {}", t);
Thread.sleep(100);
}
@Override
public void onError(Throwable t) {
log.error("error: {}", t.getMessage());
}
@Override
public void onComplete() {
log.info("complete");
}
}
reactivestreams의 Subscriber이며, 저번에 구현했던 Subscriber와 굉장히 유사한 것을 볼 수 있다.
Flux
Flux는 0~n 개의 item을 전달한다.
만약 에러가 발생하면 error signal을 전달하고 종료하며, 모든 item을 전달했다면 complete signal을 전달하고 종료한다.
backPressure를 지원한다.
- subscribe - Flux
우선 바로 사용해보자.
@Slf4j
public class FluxSimpleExample {
@SneakyThrows
public static void main(String[] args) {
log.info("start main");
getItems()
.subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
Thread.sleep(1000);
}
public static Flux<Integer> getItems(){
return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
그냥 간단하게 하나씩 넘겨주는 것이고 이것에 대한 결과는 아래와 같다.

모두 main 쓰레드에서 실행되는 것을 볼 수 있다.
- subscribeOn - Flux
다른 쓰레드에서 실행시키고 싶다면 아래의 subscribeOn을 사용해야 한다.
@Slf4j
public class FluxSimpleSubscribeOnExample {
@SneakyThrows
public static void main(String[] args) {
log.info("start main");
getItems()
.map(i -> {
log.info("map {}", i);
return i;
})
.subscribeOn(Schedulers.single())
.subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
Thread.sleep(1000);
}
private static Flux<Integer> getItems(){
return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
이렇게 subscribeOn으로 쓰레드를 지정해주면, 해당 쓰레드에서 실행이 된다.

- backPressure - Flux
BackPressure를 수행하는 코드를 작성해보자.
지금 Subscriber는 요청을 한 번만 하기 때문에 Subscriber를 다시 작성해야 한다.
다시 작성한 Subscribe이다.
onNext를 호출 당할 때마다, request로 다시 호출하여 계속 반복이 된다.
@Slf4j
public class ContinuousRequestSubscriber<T> implements Subscriber<T> {
private final Integer count = 1;
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
log.info("subscribe");
s.request(count);
log.info("request: {}", count);
}
@SneakyThrows
@Override
public void onNext(T t) {
log.info("item: {}", t);
Thread.sleep(1000);
subscription.request(1);
log.info("request: {}", count);
}
@Override
public void onError(Throwable t) {
log.error("error: {}", t.getMessage());
}
@Override
public void onComplete() {
log.info("complete");
}
}
이렇게 작성된 Subscriber를
@Slf4j
public class FluxContinuousRequestSubscriberExample {
public static void main(String[] args) {
getItems().subscribe(new ContinuousRequestSubscriber<>());
}
private static Flux<Integer> getItems() {
return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
이 코드로 실행해본다면 다음과 같이 나오는 것을 볼 수 있다.

이렇게 Request 한 만큼만 데이터를 주어, 조절하는 것을 볼 수 있다.
- error - Flux
이번에는 에러에 대한 처리이다.
Flux는 에러를 만나면 더 이상 진행하지 않는다.
@Slf4j
public class FluxErrorExample {
public static void main(String[] args) {
log.info("start main");
getItems().subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
}
private static Flux<Integer> getItems(){
return Flux.create(fluxsink -> {
fluxsink.next(0);
fluxsink.next(1);
var error = new RuntimeException("error in flux");
fluxsink.error(error);
fluxsink.next(2);
});
}
}
이렇게 0과 1을 주고 다음으로 에러를 넘기면 다음과 같이 나온다.

0과 1만 나오고 에러를 반환한 후 종료되는 것을 볼 수 있다.
- complete - Flux
complete하는 방법으로 complete를 넘기면 종료된다.
@Slf4j
public class FluxCompleteExample {
public static void main(String[] args) {
log.info("start main");
getItems().subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
}
private static Flux<Integer> getItems(){
return Flux.create(fluxSink -> {
fluxSink.complete();
});
}
}
값을 넘기지 않아도 바로 종료되는 것을 볼 수 있다.

Mono
0..1개의 item을 전달한다.
에러가 발생하면 error signal을 전달하고 종료되며, 모든 item을 전달하면 complete signal을 전달하고 종료된다.
하나의 item만 전달하기 때문에 next 실행 후 바로 complete가 보장된다.
혹은 전달하지 않고 complete를 하면 값이 없다는 것을 의미한다.
이러한 이유로 Flux를 사용하지 않고 Mono를 사용하는 경우가 있다.
데이터베이스에서 하나의 값을 가져오는 경우 등에서 사용한다.
- subscribe - Mono
@Slf4j
public class MonoSimpleExample {
@SneakyThrows
public static void main(String[] args) {
log.info("start main");
getItems()
.subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
Thread.sleep(1000);
}
private static Mono<Integer> getItems(){
return Mono.create(monoSink -> {
monoSink.success(1);
});
}
}
일단 바로 사용해보도록 하자.

이렇게 하나의 값을 전달하고 종료되는 것을 볼 수 있다.
monoSink.success(2);
라는 코드를 추가해도 값이 전달되지 않고 그 전에 종료된다.
- from - Mono
Flux를 Mono로 변환하는 방법이다.
첫 번째 값만 전달하게 된다.
from의 파라미터에 Flux를 넘겨주면 된다.
@Slf4j
public class FluxToMonoExample {
public static void main(String[] args) {
log.info("start main");
Mono.from(getItems())
.subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
}
private static Flux<Integer> getItems() {
return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
1만 넘어가는 것을 볼 수 있다.

- collectList - Mono
하나의 값만 전달하는 것이 아니라, 값들을 모두 모아 리스트로 전달하는 방법이다.
@Slf4j
public class FluxToListMonoExample {
public static void main(String[] args) {
log.info("start main");
getItems()
.collectList()
.subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
}
private static Flux<Integer> getItems(){
return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
이렇게 작성하면 수들을 모두 모아 리스트로 전달하게 된다.

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글
Mutiny (0) | 2024.03.14 |
---|---|
RxJava Reactor (2) | 2024.03.14 |
HotPublisher 구현 (0) | 2024.03.13 |
ColdPublisher 구현 (0) | 2024.03.13 |
Publisher, Subscriber에 대하여 (0) | 2024.03.12 |