CompletableFuture 클래스
우선 주요 메서드 먼저 살펴보고 가도록 하자.
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>{
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(ASYNC_POOL, supplier);
}
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(ASYNC_POOL, runnable);
}
public boolean complete(T value) {
boolean triggered = completeValue(value);
postComplete();
return triggered;
}
public boolean isCompletedExceptionally() {
Object r;
return ((r = result) instanceof AltResult) && r != NIL;
}
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
int n; Object r;
if ((n = cfs.length) <= 1)
return (n == 0)
? new CompletableFuture<Object>()
: uniCopyStage(cfs[0]);
for (CompletableFuture<?> cf : cfs)
if ((r = cf.result) != null)
return new CompletableFuture<Object>(encodeRelay(r));
cfs = cfs.clone();
CompletableFuture<Object> d = new CompletableFuture<>();
for (CompletableFuture<?> cf : cfs)
cf.unipush(new AnyOf(d, cf, cfs));
// If d was completed while we were adding completions, we should
// clean the stack of any sources that may have had completions
// pushed on their stack after d was completed.
if (d.result != null)
for (int i = 0, len = cfs.length; i < len; i++)
if (cfs[i].result != null)
for (i++; i < len; i++)
if (cfs[i].result == null)
cfs[i].cleanStack();
return d;
}
}
supplyAsync
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(ASYNC_POOL, supplier);
}
이렇게 구성이 되어 있었다.
보면 알 수 있듯이, 파라미터를 받지 않고도 결과를 만들어서 다음 task에 전달해준다.
runAsync
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(ASYNC_POOL, runnable);
}
Runnable과 비슷하다.
값을 받지도, 값을 리턴하지도 않고 수행만 하게 된다.
complete
CompletableFuture가 완료되지 않았다면 주어진 값으로 채운다.
리턴되는 Boolean은 complete에 의해 상태가 바뀌었다면 true, 아니라면 false를 반환한다.
public boolean complete(T value) {
boolean triggered = completeValue(value);
postComplete();
return triggered;
}
isCompletedExceptionally
CompletableFuture가 에러로 인해 중지가 되었는지 Boolean으로 반환하는 메서드이다.
var futureWithException = CompletableFuture.supplyAsync(() -> 1 / 0);
Thread.sleep(1000);
assert futureWithException.isDone();
assert futureWithException.isCompletedExceptionally();
이런식의 코드를 작성하여 확인 할 수 있다.
allOf
여러개의 CompletableFuture를 모아서 하나의 CompletableFuture로 변환할 수 있다.
모든 CompletableFuture가 완료되면 상태가 done으로 변경된다.
반환하는 값은 없기 때문에 각각의 값에 다시 접근하여 get으로 값을 가져와야 한다.
allOf를 테스트 해보기 위해 코드를 작성해서 확인해보자.
@Slf4j
public class A {
@SneakyThrows
public static void main(String[] args) {
var startTime = System.currentTimeMillis();
var firstFuture = waitAndReturn(100, 1);
var secondFuture = waitAndReturn(500, 2);
var thirdFuture = waitAndReturn(1000, 3);
CompletableFuture.allOf(firstFuture, secondFuture, thirdFuture)
.thenAcceptAsync(v -> {
try{
log.info("first: {}", firstFuture.get());
log.info("second: {}", secondFuture.get());
log.info("third: {}", thirdFuture.get());
}catch (Exception e){
throw new RuntimeException(e);
}
}).join();
var endTime = System.currentTimeMillis();
log.info("time: {}", endTime - startTime);
}
public static CompletableFuture<Integer> waitAndReturn(int time, int value) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return value;
});
}
}
해당 코드를 실행하면 다음과 같이 나온다.

차례차례 실행이 된 100 + 500 + 1000이 아닌 1000에 가까운 값이 나오는 것을 볼 수 있다.
anyOf
allOf와는 다르게 가장 먼저 끝난 Future의 값을 제공해준다.
방금과 비슷한 코드를 실행해보면
@Slf4j
public class A {
@SneakyThrows
public static void main(String[] args) {
var startTime = System.currentTimeMillis();
var firstFuture = waitAndReturn(100, 1);
var secondFuture = waitAndReturn(500, 2);
var thirdFuture = waitAndReturn(1000, 3);
CompletableFuture.anyOf(firstFuture, secondFuture, thirdFuture)
.thenAcceptAsync(v -> {
try{
log.info("Hi FirstValue");
log.info("value: {}", v);
}catch (Exception e){
throw new RuntimeException(e);
}
}).join();
var endTime = System.currentTimeMillis();
log.info("time: {}", endTime - startTime);
}
public static CompletableFuture<Integer> waitAndReturn(int time, int value) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return value;
});
}
}

가장 빨리 실행이 되는 Future만 가져오는 것을 볼 수 있다.
'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글
Publisher, Subscriber에 대하여 (0) | 2024.03.12 |
---|---|
CompletableFuture를 사용한 성능튜닝 (0) | 2024.03.06 |
CompletionStage 인터페이스 (1) | 2024.03.05 |
Future 인터페이스 (1) | 2024.01.09 |
Blocking, Non-blocking이란 무엇일까? (0) | 2024.01.08 |