728x90

CompletableFuture 클래스

우선 주요 메서드 먼저 살펴보고 가도록 하자.

public class CompletableFuture<T> implements Future<T>, CompletionStage<T>{
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(ASYNC_POOL, supplier);
    }

	public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(ASYNC_POOL, runnable);
    }
    
    public boolean complete(T value) {
        boolean triggered = completeValue(value);
        postComplete();
        return triggered;
    }
    
    public boolean isCompletedExceptionally() {
        Object r;
        return ((r = result) instanceof AltResult) && r != NIL;
    }
    
    public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
        return andTree(cfs, 0, cfs.length - 1);
    }
    
    public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
        int n; Object r;
        if ((n = cfs.length) <= 1)
            return (n == 0)
                ? new CompletableFuture<Object>()
                : uniCopyStage(cfs[0]);
        for (CompletableFuture<?> cf : cfs)
            if ((r = cf.result) != null)
                return new CompletableFuture<Object>(encodeRelay(r));
        cfs = cfs.clone();
        CompletableFuture<Object> d = new CompletableFuture<>();
        for (CompletableFuture<?> cf : cfs)
            cf.unipush(new AnyOf(d, cf, cfs));
        // If d was completed while we were adding completions, we should
        // clean the stack of any sources that may have had completions
        // pushed on their stack after d was completed.
        if (d.result != null)
            for (int i = 0, len = cfs.length; i < len; i++)
                if (cfs[i].result != null)
                    for (i++; i < len; i++)
                        if (cfs[i].result == null)
                            cfs[i].cleanStack();
        return d;
    }
}

 

 

supplyAsync

 

    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(ASYNC_POOL, supplier);
    }

 

이렇게 구성이 되어 있었다.

 

보면 알 수 있듯이, 파라미터를 받지 않고도 결과를 만들어서 다음 task에 전달해준다.

 

runAsync

 

	public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(ASYNC_POOL, runnable);
    }

 

Runnable과 비슷하다.

값을 받지도, 값을 리턴하지도 않고 수행만 하게 된다.

 

complete

 

CompletableFuture가 완료되지 않았다면 주어진 값으로 채운다.

리턴되는 Boolean은 complete에 의해 상태가 바뀌었다면 true, 아니라면 false를 반환한다.

    public boolean complete(T value) {
        boolean triggered = completeValue(value);
        postComplete();
        return triggered;
    }

 

isCompletedExceptionally

 

CompletableFuture가 에러로 인해 중지가 되었는지 Boolean으로 반환하는 메서드이다.

var futureWithException = CompletableFuture.supplyAsync(() -> 1 / 0);

Thread.sleep(1000);

assert futureWithException.isDone();
assert futureWithException.isCompletedExceptionally();

 

이런식의 코드를 작성하여 확인 할 수 있다.

 

allOf

 

여러개의 CompletableFuture를 모아서 하나의 CompletableFuture로 변환할 수 있다.

모든 CompletableFuture가 완료되면 상태가 done으로 변경된다.

반환하는 값은 없기 때문에 각각의 값에 다시 접근하여 get으로 값을 가져와야 한다.

 

allOf를 테스트 해보기 위해 코드를 작성해서 확인해보자.

@Slf4j
public class A {

    @SneakyThrows
    public static void main(String[] args) {

        var startTime = System.currentTimeMillis();

        var firstFuture = waitAndReturn(100, 1);
        var secondFuture = waitAndReturn(500, 2);
        var thirdFuture = waitAndReturn(1000, 3);

        CompletableFuture.allOf(firstFuture, secondFuture, thirdFuture)
                .thenAcceptAsync(v -> {
                    try{
                        log.info("first: {}", firstFuture.get());
                        log.info("second: {}", secondFuture.get());
                        log.info("third: {}", thirdFuture.get());
                    }catch (Exception e){
                        throw new RuntimeException(e);
                    }
                }).join();

        var endTime = System.currentTimeMillis();

        log.info("time: {}", endTime - startTime);
    }

    public static CompletableFuture<Integer> waitAndReturn(int time, int value) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(time);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return value;
        });
    }
}

 

해당 코드를 실행하면 다음과 같이 나온다.

 

차례차례 실행이 된 100 + 500 + 1000이 아닌 1000에 가까운 값이 나오는 것을 볼 수 있다.

 

anyOf

 

allOf와는 다르게 가장 먼저 끝난 Future의 값을 제공해준다.

 

방금과 비슷한 코드를 실행해보면

@Slf4j
public class A {

    @SneakyThrows
    public static void main(String[] args) {

        var startTime = System.currentTimeMillis();

        var firstFuture = waitAndReturn(100, 1);
        var secondFuture = waitAndReturn(500, 2);
        var thirdFuture = waitAndReturn(1000, 3);

        CompletableFuture.anyOf(firstFuture, secondFuture, thirdFuture)
                .thenAcceptAsync(v -> {
                    try{
                        log.info("Hi FirstValue");
                        log.info("value: {}", v);
                    }catch (Exception e){
                        throw new RuntimeException(e);
                    }
                }).join();

        var endTime = System.currentTimeMillis();

        log.info("time: {}", endTime - startTime);
    }

    public static CompletableFuture<Integer> waitAndReturn(int time, int value) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(time);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return value;
        });
    }
}

 

 

가장 빨리 실행이 되는 Future만 가져오는 것을 볼 수 있다.

 

 

+ Recent posts