publicclassSimpleColdPublisherMain{
@SneakyThrowspublicstaticvoidmain(String[] args){
//create Publishervar publisher = new SimpleColdPublisher();
var subscriber1 = new SimpleNamedSubscriber<Integer>("subscriber1");
publisher.subscribe(subscriber1);
Thread.sleep(5000);
var subscriber2 = new SimpleNamedSubscriber<Integer>("subscriber2");
publisher.subscribe(subscriber2);
}
}
다음과 같은 코드로 Publisher와 Subscriber를 테스트 해보았다.
이렇게 subscriber1, subscriber2가 다른 시간에 subscribe를 했음에도 같은 결과가 나오는 것을 볼 수 있다.
publicclassCompletableFuture<T> implementsFuture<T>, CompletionStage<T>{
publicstatic <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){
return asyncSupplyStage(ASYNC_POOL, supplier);
}
publicstatic CompletableFuture<Void> runAsync(Runnable runnable){
return asyncRunStage(ASYNC_POOL, runnable);
}
publicbooleancomplete(T value){
boolean triggered = completeValue(value);
postComplete();
return triggered;
}
publicbooleanisCompletedExceptionally(){
Object r;
return ((r = result) instanceof AltResult) && r != NIL;
}
publicstatic CompletableFuture<Void> allOf(CompletableFuture<?>... cfs){
return andTree(cfs, 0, cfs.length - 1);
}
publicstatic CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs){
int n; Object r;
if ((n = cfs.length) <= 1)
return (n == 0)
? new CompletableFuture<Object>()
: uniCopyStage(cfs[0]);
for (CompletableFuture<?> cf : cfs)
if ((r = cf.result) != null)
returnnew CompletableFuture<Object>(encodeRelay(r));
cfs = cfs.clone();
CompletableFuture<Object> d = new CompletableFuture<>();
for (CompletableFuture<?> cf : cfs)
cf.unipush(new AnyOf(d, cf, cfs));
// If d was completed while we were adding completions, we should// clean the stack of any sources that may have had completions// pushed on their stack after d was completed.if (d.result != null)
for (int i = 0, len = cfs.length; i < len; i++)
if (cfs[i].result != null)
for (i++; i < len; i++)
if (cfs[i].result == null)
cfs[i].cleanStack();
return d;
}
}
publicinterfaceCompletionStage<T> {
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn);
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
}
CompletionStage 인터페이스는 이렇게 구성이 되어 있다.
차례로 내려가면서 실행하기 때문에 각각 파이프 하나의 단계라고 생각하면 될 것이다.
저번 내용과 다르게, 결과를 직접적으로 가져올 수 없기 때문에 비동기 프로그래밍이 가능하게 된다.
또한 Non-blocking으로 프로그래밍하기 위해서는 별도의 쓰레드가 필요하다.
Completable은 내부적으로 FokJoinPool을 사용한다.
할당된 CPU 코어 - 1 에 해당하는 쓰레드를 관리하는 것이다.
이렇게 다른 쓰레드에서 실행이 되기 때문에 Non-blocking 프로그래밍도 가능하게 해준다.
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor);
위와는 다르게 Future가 종료되지 않았다면, thenAccept는 callee에서 실행이 되게 된다.
이를 통해, Async는 그냥 thread pool에서 가져와서 실행이 되며 Async가 아니면 stage의 상태에 따라 나뉘는 것을 볼 수 있다.
stage가 실행중이라면, 호출한 caller 쓰레드에서 실행이 된다.
stage가 실행중이 아니라면, 호출된 caller 쓰레드에서 실행이 되게 된다.
위에서 마지막에 있던 메서드 중에 파라미터가 하나 더 있던 메서드가 있었다.
executor를 넘겨주는 것인데, 해당 action이 실행될 쓰레드를 지정해주는 것이다.
@Slf4jpublicclassB{
@SneakyThrowspublicstaticvoidmain(String[] args){
var single = Executors.newSingleThreadExecutor();
var fixed = Executors.newFixedThreadPool(10);
log.info("start main");
CompletionStage<Integer> stage = CompletableFuture.supplyAsync(() -> {
log.info("supplyAsync");
return1;
});
stage
.thenAcceptAsync(i -> {
log.info("{} in thenAcceptAsync", i);
}, fixed).thenAcceptAsync(i -> {
log.info("{} in thenAcceptAsync", i);
}, single);
log.info("after thenAccept");
Thread.sleep(2000);
single.shutdown();
fixed.shutdown();
}
}
해당 코드를 실행해보면
이렇게 지정된 쓰레드에서 실행이 되는 것을 볼 수 있다.
thenApply[Async]
Function를 파라미터로 받는다.
이전 task로부터 T 타입의 값을 받아서 가공하고 U 타입의 값을 반환한다.
다음 task에게 반환했던 값이 전달된다.
값을 변형해서 전달해야 하는 경우 유용하다.
@FunctionalInterfacepublicinterfaceFunction<T, R> {
R apply(T t);
}
해당 Function을 파라미터로 받는다.
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,
Executor executor);
메서드가 다음과 같이 있다.
해당 메서드들을 하면서 확인해보자.
@Slf4jpublicclassC{
publicstaticvoidmain(String[] args)throws InterruptedException {
CompletionStage<Integer> stage = CompletableFuture.supplyAsync(() -> 1);
stage.thenApplyAsync(value -> {
var next = value + 1;
log.info("in thenApplyAsync : {}", next);
return next;
}).thenApplyAsync(value -> {
var next = "result: " + value;
log.info("in thenApplyAsync2 : {}", next);
return next;
}).thenApplyAsync(value -> {
var next = value.equals("result: 2");
log.info("in thenApplyAsync3 : {}", next);
return next;
}).thenAcceptAsync(value -> {
log.info("final: {}", value);
});
Thread.sleep(2000);
}
}
해당 코드를 실행하면 다음과 같다.
thenApply는 다른 타입을 주고 받는 것이 가능한 것을 볼 수 있다.
thenCompose[Async]
Function를 파라미터로 받는다.
이전 task로부터 T 타입의 값을 받아서 가공하고 U 타입의 CompletionStage를 반환한다.
반환한 CompletionStage가 done 상태가 되면 값을 다음 task에 전달한다.
다른 future를 반환해야 하는 경우 유용하다.
public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,
Executor executor);