728x90

https://seungkyu-han.tistory.com/111

 

Future 인터페이스

자바에서 비동기 프로그래밍을 하기 위해 알아야 하는 Future 인터페이스에 대해 알아보자. Method reference :: 연산자를 이용해서 함수에 대한 참조를 간결하게 포현한 것이다. package org.example; import j

seungkyu-han.tistory.com

저번 내용을 읽어보면 도움이 될 것이다.

 

CompletionStage

public interface CompletionStage<T> {

    public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);

    public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);

    public CompletionStage<Void> thenAccept(Consumer<? super T> action);

    public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);

    public CompletionStage<Void> thenRun(Runnable action);

    public CompletionStage<Void> thenRunAsync(Runnable action);

    public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);

    public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn);

    public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
}

CompletionStage 인터페이스는 이렇게 구성이 되어 있다.

 

차례로 내려가면서 실행하기 때문에 각각 파이프 하나의 단계라고 생각하면 될 것이다.

저번 내용과 다르게, 결과를 직접적으로 가져올 수 없기 때문에 비동기 프로그래밍이 가능하게 된다.

또한 Non-blocking으로 프로그래밍하기 위해서는 별도의 쓰레드가 필요하다.

Completable은 내부적으로 FokJoinPool을 사용한다.

할당된 CPU 코어 - 1 에 해당하는 쓰레드를 관리하는 것이다.

이렇게 다른 쓰레드에서 실행이 되기 때문에 Non-blocking 프로그래밍도 가능하게 해준다.

 

CompletionStage와 함수형 인터페이스

저번에 배웠던 함수형 인터페이스와 관련하여 각각 CompletionStage와 연결된다.

Consumer - accept -> thenAccept(Consumer action) void 반환
Function - apply -> thenApply(Function fn) 다른 타입 반환
Function - compose -> thenCompose(Function fn) Completion의 결과값
Runnable - run -> thenRun(Runnable action) void 반환

 

thenAccept[Async]

  • Consumer를 파라미터로 받는다.
  • 이전 task로부터 값을 받지만, 값을 넘기지는 않는다.
  • 다음 task에게 null이 전달된다.
  • 값을 받아서 action만 하는 경우에 사용한다.
@FunctionalInterface
public interface Consumer<T> {
    void accept(T t);
}

 

해당 Consumer를 파라미터로 받는다.

 

    public CompletionStage<Void> thenAccept(Consumer<? super T> action);
    public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
    public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,
                                                 Executor executor);

 

thenAccept도 하나만 있는 것이 아니라, 3개가 존재한다.

일단 [Async]에 대해서만 확인해보자.

 

@Slf4j
public class A {

    //future 종료된 후에 반환
    public static CompletionStage<Integer> finishedStage() throws InterruptedException {
        var future = CompletableFuture.supplyAsync(() -> {
            log.info("supplyAsync");
            return 1;
        });
        Thread.sleep(100);
        return future;
    }

    //future 종료되기 전에 반환
    public static CompletionStage<Integer> runningStage(){
        return CompletableFuture.supplyAsync(() -> {
            try{
                Thread.sleep(1000);
                log.info("I'm Running!");
            } catch (InterruptedException e){
                throw new RuntimeException(e);
            }
            return 1;
        });
    }

    public static void main(String[] args) throws InterruptedException {

        System.out.println("thenAccept");

        log.info("start thenAccept");
        CompletionStage<Integer> acceptStage = finishedStage();
        acceptStage.thenAccept(i -> {
            log.info("{} in thenAccept", i);
        }).thenAccept(i -> {
            log.info("{} in thenAccept2", i);
        });
        log.info("after thenAccept");

        System.out.println("thenAcceptAsync");

        log.info("start thenAcceptAsync");
        CompletionStage<Integer> acceptAsyncStage = finishedStage();
        acceptAsyncStage.thenAcceptAsync(i -> {
            log.info("{} in thenAcceptAsync", i);
        }).thenAcceptAsync(i -> {
            log.info("{} in thenAcceptAsync2", i);
        });
        log.info("after thenAccept");
    }
}

 

해당코드를 통해 알아보겠다.

종료된 Stage를 사용하여 thenAccept, thenAcceptAsync 2개를 확인해보았다.

 

일단 thenAccept는 반환값이 없기 때문에 2부터는 null이 찍히는 것을 볼 수 있다.

 

그리고 이 두개의 차이는 다음과 같다.

thenAccept는 Future가 완료된 상태라면, caller와 같은 쓰레드에서 실행이 된다.

그렇기 때문에 동기적으로 차례대로 시작된 것을 볼 수 있다.

thenAcceptAsync는 그냥 상태에 상관없이, 그냥 남는 쓰레드에서 실행이 된다.

 

해당 코드로 바꾸어 실행해보자.

        System.out.println("thenAccept Running");

        log.info("start thenAccept Running");
        CompletionStage<Integer> acceptRunningStage = runningStage();
        acceptRunningStage.thenAccept(i -> {
            log.info("{} in thenAccept Running", i);
        }).thenAccept(i -> {
            log.info("{} in thenAccept2 Running", i);
        });
        log.info("after thenAccept");

        Thread.sleep(2000);

        System.out.println("thenAcceptAsync");

        log.info("start thenAcceptAsync Running");
        CompletionStage<Integer> acceptAsyncRunningStage = runningStage();
        acceptAsyncRunningStage.thenAcceptAsync(i -> {
            log.info("{} in thenAcceptAsync Running", i);
        }).thenAcceptAsync(i -> {
            log.info("{} in thenAcceptAsync2 Running", i);
        });
        log.info("after thenAccept");

        Thread.sleep(2000);

 

해당 코드를 실행하면 다음과 같은 결과가 나온다.

 

위와는 다르게 Future가 종료되지 않았다면, thenAccept는 callee에서 실행이 되게 된다.

 

이를 통해, Async는 그냥 thread pool에서 가져와서 실행이 되며 Async가 아니면 stage의 상태에 따라 나뉘는 것을 볼 수 있다.

stage가 실행중이라면, 호출한 caller 쓰레드에서 실행이 된다.

stage가 실행중이 아니라면, 호출된 caller 쓰레드에서 실행이 되게 된다.

 

위에서 마지막에 있던 메서드 중에 파라미터가 하나 더 있던 메서드가 있었다.

executor를 넘겨주는 것인데, 해당 action이 실행될 쓰레드를 지정해주는 것이다.

@Slf4j
public class B {

    @SneakyThrows
    public static void main(String[] args) {
        var single = Executors.newSingleThreadExecutor();
        var fixed = Executors.newFixedThreadPool(10);

        log.info("start main");
        CompletionStage<Integer> stage = CompletableFuture.supplyAsync(() -> {
            log.info("supplyAsync");
            return 1;
        });

        stage
                .thenAcceptAsync(i -> {
                    log.info("{} in thenAcceptAsync", i);
                }, fixed).thenAcceptAsync(i -> {
                    log.info("{} in thenAcceptAsync", i);
                }, single);

        log.info("after thenAccept");
        Thread.sleep(2000);

        single.shutdown();
        fixed.shutdown();
    }
}

 

해당 코드를 실행해보면

이렇게 지정된 쓰레드에서 실행이 되는 것을 볼 수 있다.

 

thenApply[Async]

  • Function를 파라미터로 받는다.
  • 이전 task로부터 T 타입의 값을 받아서 가공하고 U 타입의 값을 반환한다.
  • 다음 task에게 반환했던 값이 전달된다.
  • 값을 변형해서 전달해야 하는 경우 유용하다.
@FunctionalInterface
public interface Function<T, R> {
    R apply(T t);
}

 

해당 Function을 파라미터로 받는다.

 

    public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);

    public <U> CompletionStage<U> thenApplyAsync
        (Function<? super T,? extends U> fn);

    public <U> CompletionStage<U> thenApplyAsync
        (Function<? super T,? extends U> fn,
         Executor executor);

 

메서드가 다음과 같이 있다.

 

해당 메서드들을 하면서 확인해보자.

@Slf4j
public class C {
    public static void main(String[] args) throws InterruptedException {
        CompletionStage<Integer> stage = CompletableFuture.supplyAsync(() -> 1);

        stage.thenApplyAsync(value -> {
            var next = value + 1;
            log.info("in thenApplyAsync : {}", next);
            return next;
        }).thenApplyAsync(value -> {
            var next = "result: " + value;
            log.info("in thenApplyAsync2 : {}", next);
            return next;
        }).thenApplyAsync(value -> {
            var next = value.equals("result: 2");
            log.info("in thenApplyAsync3 : {}", next);
            return next;
        }).thenAcceptAsync(value -> {
            log.info("final: {}", value);
        });

        Thread.sleep(2000);
    }
}

 

해당 코드를 실행하면 다음과 같다.

 

thenApply는 다른 타입을 주고 받는 것이 가능한 것을 볼 수 있다.

 

thenCompose[Async]

  • Function를 파라미터로 받는다.
  • 이전 task로부터 T 타입의 값을 받아서 가공하고 U 타입의 CompletionStage를 반환한다.
  • 반환한 CompletionStage가 done 상태가 되면 값을 다음 task에 전달한다.
  • 다른 future를 반환해야 하는 경우 유용하다.
    public <U> CompletionStage<U> thenCompose
        (Function<? super T, ? extends CompletionStage<U>> fn);

    public <U> CompletionStage<U> thenComposeAsync
        (Function<? super T, ? extends CompletionStage<U>> fn);

    public <U> CompletionStage<U> thenComposeAsync
        (Function<? super T, ? extends CompletionStage<U>> fn,
         Executor executor);

 

그냥 Future를 반환하며, 해당 Future가 끝날 때까지 기다렸다가 준다는 것이다.

 

@Slf4j
public class D {

    public static CompletionStage<Integer> addValue(int number, int value){
        return CompletableFuture.supplyAsync(() -> {
            try{
                Thread.sleep(100);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return number + value;
        });
    }

    public static void main(String[] args) throws InterruptedException {
        CompletionStage<Integer> stage = CompletableFuture.supplyAsync(() -> 1);

        stage.thenComposeAsync(value -> {
            var next = addValue(value, 1);
            log.info("in thenComposeAsync: {}", next);
            return next;
        }).thenComposeAsync(value -> {
            var next = CompletableFuture.supplyAsync(() -> {
                try{
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return "result: " + value;
            });
            log.info("in thenComposeAsync: {}", next);
            return next;
        }).thenAcceptAsync(value -> log.info("{} in then AcceptAsync", value));

        Thread.sleep(2000);
    }
}

 

해당 코드를 실행해보면

 

다음과 같이 출력이 되는데, 마지막에 result: 2로 Future가 완료된 후의 값을 가져온 것을 볼 수 있다.

 

thenRun[Async]

  • Runnable을 파라미터로 받는다.
  • 이전 task로부터 값을 받지 않고 값을 반환하지 않는다.
  • 다음 task에게 null이 전달된다.
  • future가 완료되었다는 이벤트를 기록할 때 유용하다.

Runnable 인터페이스이다.

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

 

받는 것도 주는 것도 없는 것을 볼 수 있다.

 

당연히 큰 역할을 하기보다, 그냥 로그? 이벤트 기록 용으로 사용한다고 한다.

 

@Slf4j
public class E {

    public static void main(String[] args) throws InterruptedException {
        CompletionStage<Integer> stage = CompletableFuture.supplyAsync(() -> 1);

        stage.thenRunAsync(() -> log.info("in thenRunAsync"))
                .thenRunAsync(() -> log.info("in thenRunAsync2"))
                .thenAcceptAsync(value -> log.info("{} in thenAcceptAsync", value));

        Thread.sleep(2000);
    }
}

 

해당 코드를 실행해보면

 

그냥 주지도, 받지도 않는 것을 볼 수 있다.

 

exceptionally

  • Function을 파라미터로 받는다.
  • 이전 task에서 발생한 exception을 받아서 처리하고 값을 반환한다.
  • 다음 task에게 반환된 값을 전달한다.
  • future 파이프에서 발생한 에러를 처리할 때 유용하다.
    public CompletionStage<T> exceptionally
        (Function<Throwable, ? extends T> fn);

 

간단하게 해당 에러를 발생하는 코드를 만들어보면

@Slf4j
public class F {

    public static void main(String[] args) throws InterruptedException {
        CompletionStage<Integer> stage = CompletableFuture.supplyAsync(() -> 1);

        stage.thenApplyAsync(i -> {
            log.info("in then ApplyAsync");
            return i / 0;
        }).exceptionally(e -> {
            log.info("{} in exceptionally", e.getMessage());
            return 0;
        }).thenAcceptAsync(value -> {
            log.info("{} in thenAcceptAsync", value);
        });

        Thread.sleep(2000);
    }
}

 

이러한 결과가 나오는 것을 볼 수 있다.

+ Recent posts