728x90

그 다음은 HotPublisher이다.

 

소비자와 관계 없이 데이터를 한 번만 통지하는 것이 HotPublisher이다.

해당하는 데이터는 한 번만 통지하고, 소비자는 구독한 시점 이후에 통지된 데이터들만 전달 받을 수 있다.

 

바로 작성해보도록 하자.

Subscriber는 변함 없이 ColdPublisher에서 사용했던 코드를 그대로 사용한다.

 

  • Publisher

Cold와는 지속적으로 데이터를 전달한다.

@Slf4j
public class SimpleHotPublisher implements Flow.Publisher<Integer>{

    private final ExecutorService publisherExecutor = Executors.newSingleThreadExecutor();
    private final Future<Void> task;
    private List<Integer> numbers = new ArrayList<>();
    private List<SimpleHotSubscription> subscriptions = new ArrayList<>();

    public SimpleHotPublisher(){
        numbers.add(1);
        task = publisherExecutor.submit(() -> {
            for(int i = 2; !Thread.interrupted(); i++){
                numbers.add(i);
                subscriptions.forEach(SimpleHotSubscription::wakeup);
                Thread.sleep(100);
            }
            return null;
        });
    }

    public void shutdown(){
        this.task.cancel(true);
        publisherExecutor.shutdown();
    }

    @Override
    public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
        var subscription = new SimpleHotSubscription(subscriber);
        subscriber.onSubscribe(subscription);
        subscriptions.add(subscription);
    }

    private class SimpleHotSubscription implements Flow.Subscription{
        private int offset;
        private int requiredOffset;
        private final Flow.Subscriber<? super Integer> subscriber;
        private final ExecutorService subscriptionExecutorService = Executors.newSingleThreadExecutor();

        public SimpleHotSubscription(Flow.Subscriber<? super Integer> subscriber){
            int lastElementIndex = numbers.size() - 1;
            this.offset = lastElementIndex;
            this.requiredOffset = lastElementIndex;
            this.subscriber = subscriber;
        }

        @Override
        public void request(long n) {
            requiredOffset += n;

            onNextWhilePossible();
        }

        @Override
        public void cancel() {
            this.subscriber.onComplete();
            if(subscriptions.contains(this)){
                subscriptions.remove(this);
            }
            subscriptionExecutorService.shutdown();
        }

        public void wakeup(){
            onNextWhilePossible();
        }

        private void onNextWhilePossible(){
            subscriptionExecutorService.submit(() -> {
                while (offset < requiredOffset && offset < numbers.size()){
                    var item = numbers.get(offset);
                    subscriber.onNext(item);
                    offset++;
                }
            });
        }

    }
}

 

해당 코드를 실행해보자.

 

 

이렇게 시점에 따라 전달받는 데이터가 다른 것을 볼 수 있다.

 

 

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

RxJava Reactor  (2) 2024.03.14
Project reactor  (0) 2024.03.14
ColdPublisher 구현  (0) 2024.03.13
Publisher, Subscriber에 대하여  (0) 2024.03.12
CompletableFuture를 사용한 성능튜닝  (0) 2024.03.06
728x90

Publisher 중에서 ColdPublisher를 구현해보자.

 

ColdPublisher에 대해 먼저 알아보자.

 

소비자가 구독 할 때마다 데이터를 처음부터 새로 통지하는 것을 ColdPublisher라고 한다.

소비자는 구독 시점과 상관 없이 통지된 데이터를 처음부터 전달 받게 된다.

 

모든 소비자가 같은 데이터를 전달받게 될 것이다.

 

간단하게 한 번 구현해보자.

 

  • Publisher, Subscription

우선 Publisher와 Subscription이다.

public class SimpleColdPublisher implements Flow.Publisher<Integer>{

    @Override
    public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
        var iterator = Collections.synchronizedList(
                IntStream.range(1, 10).boxed().collect(Collectors.toList())
        ).iterator();
        var subscription = new SimpleColdSubscription(iterator, subscriber);
        subscriber.onSubscribe(subscription);
    }

    public class SimpleColdSubscription implements Flow.Subscription{

        private final Iterator<Integer> iterator;
        private final Flow.Subscriber<? super Integer> subscriber;
        private final ExecutorService executor = Executors.newSingleThreadExecutor();

        public SimpleColdSubscription(Iterator<Integer> iterator, Flow.Subscriber<? super Integer> subscriber) {
            this.iterator = iterator;
            this.subscriber = subscriber;
        }

        @Override
        public void request(long n) {
            executor.submit(() -> {
               for(int i = 0; i < n; i++){
                   if(iterator.hasNext()){
                       var number = iterator.next();
                       iterator.remove();
                       subscriber.onNext(number);
                   }
                   else{
                       subscriber.onComplete();
                       executor.shutdown();
                       break;
                   }
               }
            });
        }

        @Override
        public void cancel() {
            subscriber.onComplete();
        }
    }
}

 

ColdPublisher답게 전달할 데이터의 배열을 넘겨주게 된다.

 

해당 iterator의 데이터를 전부 전달해주고 종료된다.

 

 

  • Subscriber

그 다음은 Subscriber이다.

subscription을 인자로 받는 onSubscribe가 있으며, onNext를 호출 할 때마다 request로 데이터를 하나씩 받아온다.

 

@Slf4j
public class SimpleNamedSubscriber<T> implements Flow.Subscriber<T> {

    private Flow.Subscription subscription;
    private final String name;

    public SimpleNamedSubscriber(String name){
        this.name = name;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1);
        log.info("onSubscribe");
    }

    @Override
    public void onNext(T item) {
        log.info("name: {}, onNext: {}", name, item);
        this.subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        log.error("onError: {}", throwable.getMessage());
    }

    @Override
    public void onComplete() {
        log.info("onComplete");
    }
    
    public void cancel(){
        log.info("cancel");
        this.subscription.cancel();
    }
}

 

 

해당 코드를 실행해보자.

public class SimpleColdPublisherMain {

    @SneakyThrows
    public static void main(String[] args) {
        //create Publisher
        var publisher = new SimpleColdPublisher();

        var subscriber1 = new SimpleNamedSubscriber<Integer>("subscriber1");
        publisher.subscribe(subscriber1);

        Thread.sleep(5000);

        var subscriber2 = new SimpleNamedSubscriber<Integer>("subscriber2");
        publisher.subscribe(subscriber2);
    }
}

 

다음과 같은 코드로 Publisher와 Subscriber를 테스트 해보았다.

 

 

이렇게 subscriber1, subscriber2가 다른 시간에 subscribe를 했음에도 같은 결과가 나오는 것을 볼 수 있다.

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

Project reactor  (0) 2024.03.14
HotPublisher 구현  (0) 2024.03.13
Publisher, Subscriber에 대하여  (0) 2024.03.12
CompletableFuture를 사용한 성능튜닝  (0) 2024.03.06
CompletableFuture 인터페이스  (1) 2024.03.06
728x90

이번에 리액티브 프로그래밍을 공부하다가, 이런 용어들이 나왔다.

중간에 흐름을 조절해 준다는 역할을 한다고 하는데, 이해가 잘 되지 않아 일단 정리해 보고 넘어가려 한다.

 

 

우선 이해하기 쉽게 그림이다.

 

  • Publisher

Publisher가 Subscriber를 Subscribe한다.

Publisher는 request를 받으면 데이터를 생성하여 보낸다.

  • Subscriber

Subscriber가 Subscription을 onSubscribe 한다.

Subscriber는 필요할 때 Subscribe의 request를 통해 Publisher에게 데이터를 요청한다.

Subscriber는 onNext로 데이터를 받는다.

  • Subscription

Subscription은 Subscriber에 의해 등록된다.

 

 

모든 요청이 성공적으로 완료된다면 onComplete를, 요청이 실패하면 onError를 호출하고 흐름을 종료한다.

 

 

당연히 Publisher는 여러 개의 Subscriber를 Subscribe 가능하다.

 

하나씩 살펴보자

 

Publisher

@FunctionalInterface
public static interface Publisher<T> {
    public void subscribe(Subscriber<? super T> subscriber);
}

 

하나의 메서드 밖에 없다.

subscribe로 Subscriber를 등록하면 된다.

 

Subscription

public static interface Subscription {
    public void request(long n);

    public void cancel();
}

 

 

  • requst

Subscriber가 데이터를 처리 가능 할 때 request를 호출한다.

파라미터 n은 Publisher에게 요청하는 데이터의 개수이다.

  • cancel

Publisher에게 데이터를 그만 보내라고 요청하는 메서드이다.

 

Subscriber

public static interface Subscriber<T> {
    public void onSubscribe(Subscription subscription);
    
    public void onNext(T item);

    public void onError(Throwable throwable);

    public void onComplete();
}

 

  • onSubscribe

Subscription을 파라미터로 받아 request를 호출한다.

Subscription의 request를 호출하는 것은 온전히 Subscriber의 결정이며, 호출되기 전까지는 어떤 데이터도 흐르지 않는다.

  • onNext

Publisher가 보낸 데이터이다.

  • onError

에러로 종료

  • onComplete

성공적으로 종료

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

HotPublisher 구현  (0) 2024.03.13
ColdPublisher 구현  (0) 2024.03.13
CompletableFuture를 사용한 성능튜닝  (0) 2024.03.06
CompletableFuture 인터페이스  (1) 2024.03.06
CompletionStage 인터페이스  (1) 2024.03.05
728x90

지금까지 배운 것을 바탕으로 성능을 개선해보자.

 

동기적으로 작성된 코드를 비동기적으로 변경하는 것이다.

현재 동기적으로 작성된 코드는 다음과 같다.

public Optional<User> getUserById(String id){
        return userRepository.findById(id)
                .map(user -> {
                    var image = imageRepository.findById(user.getProfileImageId())
                            .map(imageEntity -> {
                                return new Image(imageEntity.getId(), imageEntity.getName(), imageEntity.getUrl());
                            });

                    var articles = articleRepository.findAllByUserId(user.getId())
                            .stream().map(articleEntity ->
                                    new Article(articleEntity.getId(), articleEntity.getTitle(), articleEntity.getContent())).toList();

                    var followCount = followRepository.countByUserID(user.getId());

                    return new User(
                            user.getId(),
                            user.getName(),
                            user.getAge(),
                            image,
                            articles,
                            followCount
                    );
                });
    }

 

Repository에서 가져올 때마다 1초 정도 시간이 걸린다고 생각하여 1초 정도 Thread.sleep을 사용했고, 조회한 값들을 다른 객체에 넣어서 반환하는 메서드이다.

 

차례대로 1초씩 3번 호출하기 때문에 적어도 3초 이상의 시간이 소모될 것이다.

 

    void testGetUser(){
        //given
        String userId = "1234";

        //when
        Optional<User> optionalUser = userBlockingService.getUserById(userId);

        //then
        assertFalse(optionalUser.isEmpty());
        var user = optionalUser.get();
        assertEquals(user.getName(), "sk");

        assertFalse(user.getProfileImage().isEmpty());
        assertFalse(user.getProfileImage().isEmpty());
        var image = user.getProfileImage().get();
        assertEquals(image.getId(), "image#1000");
        assertEquals(image.getName(), "profileImage");
        assertEquals(image.getUrl(), "https://avatars.githubusercontent.com/u/98071131?s=400&u=9107a0b50b52da5bbc8528157eed1cca34feb3c5&v=4");

        assertEquals(2, user.getArticleList().size());

        assertEquals(1000, user.getFollowCount());
    }

 

해당 테스트 코드의 시간을 확인 했을 때 4sec 74ms가 소모되었다.

 

이제 변경해보도록 하자.

일단 각 Repository를 CompletableFuture을 반환하도록 메서드를 변경했다.

    @SneakyThrows
    public CompletableFuture<Optional<UserEntity>> findById(String userId){
        return CompletableFuture.supplyAsync(() -> {
            log.info("UserRepository.findById: {}", userId);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            var user = userMap.get(userId);
            return Optional.ofNullable(user);
        });
    }

 

 

현재 에러만 잡아서 변경해놓은 코드는 다음과 같다.

    @SneakyThrows
    public Optional<User> getUserById(String id){
        return userRepository.findById(id).get()
                .map(this::getUser);
    }

    @SneakyThrows
    private User getUser(UserEntity user){
        var image = imageRepository.findById(user.getProfileImageId()).get()
                .map(imageEntity -> {
                    return new Image(imageEntity.getId(), imageEntity.getName(), imageEntity.getUrl());
                });

        var articles = articleRepository.findAllByUserId(user.getId()).get()
                .stream().map(articleEntity ->
                        new Article(articleEntity.getId(), articleEntity.getTitle(), articleEntity.getContent())).toList();

        var followCount = followRepository.countByUserID(user.getId()).get();

        return new User(
                user.getId(),
                user.getName(),
                user.getAge(),
                image,
                articles,
                followCount
        );
    }

 

이거를 Repository에 접근 할 때마다 CompletableFuture를 사용했다.

 

    @SneakyThrows
    public CompletableFuture<Optional<User>> getUserById(String id){
        return userRepository.findById(id)
                .thenCompose(this::getUser);
    }

    @SneakyThrows
    private CompletableFuture<Optional<User>> getUser(Optional<UserEntity> userEntityOptional) {
        if (userEntityOptional.isEmpty()) {
            return CompletableFuture.completedFuture(Optional.empty());
        }

        var userEntity = userEntityOptional.get();

        var imageFuture = imageRepository.findById(userEntity.getProfileImageId())
                .thenApplyAsync(imageEntityOptional ->
                        imageEntityOptional.map(imageEntity ->
                                new Image(imageEntity.getId(), imageEntity.getName(), imageEntity.getUrl())
                        )
                );


        var articlesFuture = articleRepository.findAllByUserId(userEntity.getId())
                .thenApplyAsync(articleEntities ->
                        articleEntities.stream()
                                .map(articleEntity ->
                                        new Article(articleEntity.getId(), articleEntity.getTitle(), articleEntity.getContent())
                                )
                                .collect(Collectors.toList())
                );

        var followCountFuture = followRepository.countByUserID(userEntity.getId());

        return CompletableFuture.allOf(imageFuture, articlesFuture, followCountFuture)
                .thenAcceptAsync(v -> {
                    log.info("Three futures are completed");
                })
                .thenRunAsync(() -> {
                    log.info("Three futures are also completed");
                })
                .thenApplyAsync(v -> {
                    try {
                        var image = imageFuture.get();
                        var articles = articlesFuture.get();
                        var followCount = followCountFuture.get();

                        return Optional.of(
                                new User(
                                        userEntity.getId(),
                                        userEntity.getName(),
                                        userEntity.getAge(),
                                        image,
                                        articles,
                                        followCount
                                )
                        );
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
    }
}

 

이렇게 변경하고 전에 사용했던 테스트 코드의 시간을 측정해보았다.

 

거의 반으로 줄어든 2sec 66ms가 나왔다.

 

전 코드에서 Repository에 접근 하는 것을 기다리기 보다 비동기적으로 실행한다면 시간을 크게 줄일 수 있는 것을 볼 수 있었던 것 같다.

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

ColdPublisher 구현  (0) 2024.03.13
Publisher, Subscriber에 대하여  (0) 2024.03.12
CompletableFuture 인터페이스  (1) 2024.03.06
CompletionStage 인터페이스  (1) 2024.03.05
Future 인터페이스  (1) 2024.01.09
728x90

CompletableFuture 클래스

우선 주요 메서드 먼저 살펴보고 가도록 하자.

public class CompletableFuture<T> implements Future<T>, CompletionStage<T>{
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(ASYNC_POOL, supplier);
    }

	public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(ASYNC_POOL, runnable);
    }
    
    public boolean complete(T value) {
        boolean triggered = completeValue(value);
        postComplete();
        return triggered;
    }
    
    public boolean isCompletedExceptionally() {
        Object r;
        return ((r = result) instanceof AltResult) && r != NIL;
    }
    
    public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
        return andTree(cfs, 0, cfs.length - 1);
    }
    
    public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
        int n; Object r;
        if ((n = cfs.length) <= 1)
            return (n == 0)
                ? new CompletableFuture<Object>()
                : uniCopyStage(cfs[0]);
        for (CompletableFuture<?> cf : cfs)
            if ((r = cf.result) != null)
                return new CompletableFuture<Object>(encodeRelay(r));
        cfs = cfs.clone();
        CompletableFuture<Object> d = new CompletableFuture<>();
        for (CompletableFuture<?> cf : cfs)
            cf.unipush(new AnyOf(d, cf, cfs));
        // If d was completed while we were adding completions, we should
        // clean the stack of any sources that may have had completions
        // pushed on their stack after d was completed.
        if (d.result != null)
            for (int i = 0, len = cfs.length; i < len; i++)
                if (cfs[i].result != null)
                    for (i++; i < len; i++)
                        if (cfs[i].result == null)
                            cfs[i].cleanStack();
        return d;
    }
}

 

 

supplyAsync

 

    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(ASYNC_POOL, supplier);
    }

 

이렇게 구성이 되어 있었다.

 

보면 알 수 있듯이, 파라미터를 받지 않고도 결과를 만들어서 다음 task에 전달해준다.

 

runAsync

 

	public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(ASYNC_POOL, runnable);
    }

 

Runnable과 비슷하다.

값을 받지도, 값을 리턴하지도 않고 수행만 하게 된다.

 

complete

 

CompletableFuture가 완료되지 않았다면 주어진 값으로 채운다.

리턴되는 Boolean은 complete에 의해 상태가 바뀌었다면 true, 아니라면 false를 반환한다.

    public boolean complete(T value) {
        boolean triggered = completeValue(value);
        postComplete();
        return triggered;
    }

 

isCompletedExceptionally

 

CompletableFuture가 에러로 인해 중지가 되었는지 Boolean으로 반환하는 메서드이다.

var futureWithException = CompletableFuture.supplyAsync(() -> 1 / 0);

Thread.sleep(1000);

assert futureWithException.isDone();
assert futureWithException.isCompletedExceptionally();

 

이런식의 코드를 작성하여 확인 할 수 있다.

 

allOf

 

여러개의 CompletableFuture를 모아서 하나의 CompletableFuture로 변환할 수 있다.

모든 CompletableFuture가 완료되면 상태가 done으로 변경된다.

반환하는 값은 없기 때문에 각각의 값에 다시 접근하여 get으로 값을 가져와야 한다.

 

allOf를 테스트 해보기 위해 코드를 작성해서 확인해보자.

@Slf4j
public class A {

    @SneakyThrows
    public static void main(String[] args) {

        var startTime = System.currentTimeMillis();

        var firstFuture = waitAndReturn(100, 1);
        var secondFuture = waitAndReturn(500, 2);
        var thirdFuture = waitAndReturn(1000, 3);

        CompletableFuture.allOf(firstFuture, secondFuture, thirdFuture)
                .thenAcceptAsync(v -> {
                    try{
                        log.info("first: {}", firstFuture.get());
                        log.info("second: {}", secondFuture.get());
                        log.info("third: {}", thirdFuture.get());
                    }catch (Exception e){
                        throw new RuntimeException(e);
                    }
                }).join();

        var endTime = System.currentTimeMillis();

        log.info("time: {}", endTime - startTime);
    }

    public static CompletableFuture<Integer> waitAndReturn(int time, int value) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(time);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return value;
        });
    }
}

 

해당 코드를 실행하면 다음과 같이 나온다.

 

차례차례 실행이 된 100 + 500 + 1000이 아닌 1000에 가까운 값이 나오는 것을 볼 수 있다.

 

anyOf

 

allOf와는 다르게 가장 먼저 끝난 Future의 값을 제공해준다.

 

방금과 비슷한 코드를 실행해보면

@Slf4j
public class A {

    @SneakyThrows
    public static void main(String[] args) {

        var startTime = System.currentTimeMillis();

        var firstFuture = waitAndReturn(100, 1);
        var secondFuture = waitAndReturn(500, 2);
        var thirdFuture = waitAndReturn(1000, 3);

        CompletableFuture.anyOf(firstFuture, secondFuture, thirdFuture)
                .thenAcceptAsync(v -> {
                    try{
                        log.info("Hi FirstValue");
                        log.info("value: {}", v);
                    }catch (Exception e){
                        throw new RuntimeException(e);
                    }
                }).join();

        var endTime = System.currentTimeMillis();

        log.info("time: {}", endTime - startTime);
    }

    public static CompletableFuture<Integer> waitAndReturn(int time, int value) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(time);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return value;
        });
    }
}

 

 

가장 빨리 실행이 되는 Future만 가져오는 것을 볼 수 있다.

 

 

728x90

https://seungkyu-han.tistory.com/111

 

Future 인터페이스

자바에서 비동기 프로그래밍을 하기 위해 알아야 하는 Future 인터페이스에 대해 알아보자. Method reference :: 연산자를 이용해서 함수에 대한 참조를 간결하게 포현한 것이다. package org.example; import j

seungkyu-han.tistory.com

저번 내용을 읽어보면 도움이 될 것이다.

 

CompletionStage

public interface CompletionStage<T> {

    public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);

    public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);

    public CompletionStage<Void> thenAccept(Consumer<? super T> action);

    public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);

    public CompletionStage<Void> thenRun(Runnable action);

    public CompletionStage<Void> thenRunAsync(Runnable action);

    public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);

    public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn);

    public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
}

CompletionStage 인터페이스는 이렇게 구성이 되어 있다.

 

차례로 내려가면서 실행하기 때문에 각각 파이프 하나의 단계라고 생각하면 될 것이다.

저번 내용과 다르게, 결과를 직접적으로 가져올 수 없기 때문에 비동기 프로그래밍이 가능하게 된다.

또한 Non-blocking으로 프로그래밍하기 위해서는 별도의 쓰레드가 필요하다.

Completable은 내부적으로 FokJoinPool을 사용한다.

할당된 CPU 코어 - 1 에 해당하는 쓰레드를 관리하는 것이다.

이렇게 다른 쓰레드에서 실행이 되기 때문에 Non-blocking 프로그래밍도 가능하게 해준다.

 

CompletionStage와 함수형 인터페이스

저번에 배웠던 함수형 인터페이스와 관련하여 각각 CompletionStage와 연결된다.

Consumer - accept -> thenAccept(Consumer action) void 반환
Function - apply -> thenApply(Function fn) 다른 타입 반환
Function - compose -> thenCompose(Function fn) Completion의 결과값
Runnable - run -> thenRun(Runnable action) void 반환

 

thenAccept[Async]

  • Consumer를 파라미터로 받는다.
  • 이전 task로부터 값을 받지만, 값을 넘기지는 않는다.
  • 다음 task에게 null이 전달된다.
  • 값을 받아서 action만 하는 경우에 사용한다.
@FunctionalInterface
public interface Consumer<T> {
    void accept(T t);
}

 

해당 Consumer를 파라미터로 받는다.

 

    public CompletionStage<Void> thenAccept(Consumer<? super T> action);
    public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
    public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,
                                                 Executor executor);

 

thenAccept도 하나만 있는 것이 아니라, 3개가 존재한다.

일단 [Async]에 대해서만 확인해보자.

 

@Slf4j
public class A {

    //future 종료된 후에 반환
    public static CompletionStage<Integer> finishedStage() throws InterruptedException {
        var future = CompletableFuture.supplyAsync(() -> {
            log.info("supplyAsync");
            return 1;
        });
        Thread.sleep(100);
        return future;
    }

    //future 종료되기 전에 반환
    public static CompletionStage<Integer> runningStage(){
        return CompletableFuture.supplyAsync(() -> {
            try{
                Thread.sleep(1000);
                log.info("I'm Running!");
            } catch (InterruptedException e){
                throw new RuntimeException(e);
            }
            return 1;
        });
    }

    public static void main(String[] args) throws InterruptedException {

        System.out.println("thenAccept");

        log.info("start thenAccept");
        CompletionStage<Integer> acceptStage = finishedStage();
        acceptStage.thenAccept(i -> {
            log.info("{} in thenAccept", i);
        }).thenAccept(i -> {
            log.info("{} in thenAccept2", i);
        });
        log.info("after thenAccept");

        System.out.println("thenAcceptAsync");

        log.info("start thenAcceptAsync");
        CompletionStage<Integer> acceptAsyncStage = finishedStage();
        acceptAsyncStage.thenAcceptAsync(i -> {
            log.info("{} in thenAcceptAsync", i);
        }).thenAcceptAsync(i -> {
            log.info("{} in thenAcceptAsync2", i);
        });
        log.info("after thenAccept");
    }
}

 

해당코드를 통해 알아보겠다.

종료된 Stage를 사용하여 thenAccept, thenAcceptAsync 2개를 확인해보았다.

 

일단 thenAccept는 반환값이 없기 때문에 2부터는 null이 찍히는 것을 볼 수 있다.

 

그리고 이 두개의 차이는 다음과 같다.

thenAccept는 Future가 완료된 상태라면, caller와 같은 쓰레드에서 실행이 된다.

그렇기 때문에 동기적으로 차례대로 시작된 것을 볼 수 있다.

thenAcceptAsync는 그냥 상태에 상관없이, 그냥 남는 쓰레드에서 실행이 된다.

 

해당 코드로 바꾸어 실행해보자.

        System.out.println("thenAccept Running");

        log.info("start thenAccept Running");
        CompletionStage<Integer> acceptRunningStage = runningStage();
        acceptRunningStage.thenAccept(i -> {
            log.info("{} in thenAccept Running", i);
        }).thenAccept(i -> {
            log.info("{} in thenAccept2 Running", i);
        });
        log.info("after thenAccept");

        Thread.sleep(2000);

        System.out.println("thenAcceptAsync");

        log.info("start thenAcceptAsync Running");
        CompletionStage<Integer> acceptAsyncRunningStage = runningStage();
        acceptAsyncRunningStage.thenAcceptAsync(i -> {
            log.info("{} in thenAcceptAsync Running", i);
        }).thenAcceptAsync(i -> {
            log.info("{} in thenAcceptAsync2 Running", i);
        });
        log.info("after thenAccept");

        Thread.sleep(2000);

 

해당 코드를 실행하면 다음과 같은 결과가 나온다.

 

위와는 다르게 Future가 종료되지 않았다면, thenAccept는 callee에서 실행이 되게 된다.

 

이를 통해, Async는 그냥 thread pool에서 가져와서 실행이 되며 Async가 아니면 stage의 상태에 따라 나뉘는 것을 볼 수 있다.

stage가 실행중이라면, 호출한 caller 쓰레드에서 실행이 된다.

stage가 실행중이 아니라면, 호출된 caller 쓰레드에서 실행이 되게 된다.

 

위에서 마지막에 있던 메서드 중에 파라미터가 하나 더 있던 메서드가 있었다.

executor를 넘겨주는 것인데, 해당 action이 실행될 쓰레드를 지정해주는 것이다.

@Slf4j
public class B {

    @SneakyThrows
    public static void main(String[] args) {
        var single = Executors.newSingleThreadExecutor();
        var fixed = Executors.newFixedThreadPool(10);

        log.info("start main");
        CompletionStage<Integer> stage = CompletableFuture.supplyAsync(() -> {
            log.info("supplyAsync");
            return 1;
        });

        stage
                .thenAcceptAsync(i -> {
                    log.info("{} in thenAcceptAsync", i);
                }, fixed).thenAcceptAsync(i -> {
                    log.info("{} in thenAcceptAsync", i);
                }, single);

        log.info("after thenAccept");
        Thread.sleep(2000);

        single.shutdown();
        fixed.shutdown();
    }
}

 

해당 코드를 실행해보면

이렇게 지정된 쓰레드에서 실행이 되는 것을 볼 수 있다.

 

thenApply[Async]

  • Function를 파라미터로 받는다.
  • 이전 task로부터 T 타입의 값을 받아서 가공하고 U 타입의 값을 반환한다.
  • 다음 task에게 반환했던 값이 전달된다.
  • 값을 변형해서 전달해야 하는 경우 유용하다.
@FunctionalInterface
public interface Function<T, R> {
    R apply(T t);
}

 

해당 Function을 파라미터로 받는다.

 

    public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);

    public <U> CompletionStage<U> thenApplyAsync
        (Function<? super T,? extends U> fn);

    public <U> CompletionStage<U> thenApplyAsync
        (Function<? super T,? extends U> fn,
         Executor executor);

 

메서드가 다음과 같이 있다.

 

해당 메서드들을 하면서 확인해보자.

@Slf4j
public class C {
    public static void main(String[] args) throws InterruptedException {
        CompletionStage<Integer> stage = CompletableFuture.supplyAsync(() -> 1);

        stage.thenApplyAsync(value -> {
            var next = value + 1;
            log.info("in thenApplyAsync : {}", next);
            return next;
        }).thenApplyAsync(value -> {
            var next = "result: " + value;
            log.info("in thenApplyAsync2 : {}", next);
            return next;
        }).thenApplyAsync(value -> {
            var next = value.equals("result: 2");
            log.info("in thenApplyAsync3 : {}", next);
            return next;
        }).thenAcceptAsync(value -> {
            log.info("final: {}", value);
        });

        Thread.sleep(2000);
    }
}

 

해당 코드를 실행하면 다음과 같다.

 

thenApply는 다른 타입을 주고 받는 것이 가능한 것을 볼 수 있다.

 

thenCompose[Async]

  • Function를 파라미터로 받는다.
  • 이전 task로부터 T 타입의 값을 받아서 가공하고 U 타입의 CompletionStage를 반환한다.
  • 반환한 CompletionStage가 done 상태가 되면 값을 다음 task에 전달한다.
  • 다른 future를 반환해야 하는 경우 유용하다.
    public <U> CompletionStage<U> thenCompose
        (Function<? super T, ? extends CompletionStage<U>> fn);

    public <U> CompletionStage<U> thenComposeAsync
        (Function<? super T, ? extends CompletionStage<U>> fn);

    public <U> CompletionStage<U> thenComposeAsync
        (Function<? super T, ? extends CompletionStage<U>> fn,
         Executor executor);

 

그냥 Future를 반환하며, 해당 Future가 끝날 때까지 기다렸다가 준다는 것이다.

 

@Slf4j
public class D {

    public static CompletionStage<Integer> addValue(int number, int value){
        return CompletableFuture.supplyAsync(() -> {
            try{
                Thread.sleep(100);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return number + value;
        });
    }

    public static void main(String[] args) throws InterruptedException {
        CompletionStage<Integer> stage = CompletableFuture.supplyAsync(() -> 1);

        stage.thenComposeAsync(value -> {
            var next = addValue(value, 1);
            log.info("in thenComposeAsync: {}", next);
            return next;
        }).thenComposeAsync(value -> {
            var next = CompletableFuture.supplyAsync(() -> {
                try{
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return "result: " + value;
            });
            log.info("in thenComposeAsync: {}", next);
            return next;
        }).thenAcceptAsync(value -> log.info("{} in then AcceptAsync", value));

        Thread.sleep(2000);
    }
}

 

해당 코드를 실행해보면

 

다음과 같이 출력이 되는데, 마지막에 result: 2로 Future가 완료된 후의 값을 가져온 것을 볼 수 있다.

 

thenRun[Async]

  • Runnable을 파라미터로 받는다.
  • 이전 task로부터 값을 받지 않고 값을 반환하지 않는다.
  • 다음 task에게 null이 전달된다.
  • future가 완료되었다는 이벤트를 기록할 때 유용하다.

Runnable 인터페이스이다.

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

 

받는 것도 주는 것도 없는 것을 볼 수 있다.

 

당연히 큰 역할을 하기보다, 그냥 로그? 이벤트 기록 용으로 사용한다고 한다.

 

@Slf4j
public class E {

    public static void main(String[] args) throws InterruptedException {
        CompletionStage<Integer> stage = CompletableFuture.supplyAsync(() -> 1);

        stage.thenRunAsync(() -> log.info("in thenRunAsync"))
                .thenRunAsync(() -> log.info("in thenRunAsync2"))
                .thenAcceptAsync(value -> log.info("{} in thenAcceptAsync", value));

        Thread.sleep(2000);
    }
}

 

해당 코드를 실행해보면

 

그냥 주지도, 받지도 않는 것을 볼 수 있다.

 

exceptionally

  • Function을 파라미터로 받는다.
  • 이전 task에서 발생한 exception을 받아서 처리하고 값을 반환한다.
  • 다음 task에게 반환된 값을 전달한다.
  • future 파이프에서 발생한 에러를 처리할 때 유용하다.
    public CompletionStage<T> exceptionally
        (Function<Throwable, ? extends T> fn);

 

간단하게 해당 에러를 발생하는 코드를 만들어보면

@Slf4j
public class F {

    public static void main(String[] args) throws InterruptedException {
        CompletionStage<Integer> stage = CompletableFuture.supplyAsync(() -> 1);

        stage.thenApplyAsync(i -> {
            log.info("in then ApplyAsync");
            return i / 0;
        }).exceptionally(e -> {
            log.info("{} in exceptionally", e.getMessage());
            return 0;
        }).thenAcceptAsync(value -> {
            log.info("{} in thenAcceptAsync", value);
        });

        Thread.sleep(2000);
    }
}

 

이러한 결과가 나오는 것을 볼 수 있다.

728x90

자바에서 비동기 프로그래밍을 하기 위해 알아야 하는 Future 인터페이스에 대해 알아보자.

 

Method reference

:: 연산자를 이용해서 함수에 대한 참조를 간결하게 포현한 것이다.

package org.example;

import java.util.function.Consumer;
import java.util.stream.Stream;

public class Main {

    public static class Student {
        private final String name;

        public Student(String name) {
            this.name = name;
        }

        public boolean compareTo(Student student) {
            return student.name.compareTo(name) > 0;
        }

        public String getName() {
            return name;
        }
    }

    public static void print(String name) {
        System.out.println(name);
    }

    public static void main(String[] args) {
        var target = new Student("f");

        Consumer<String> staticPrint = Main::print;

        Stream.of("a", "b", "k", "z")
                .map(Student::new)
                .filter(target::compareTo)
                .map(Student::getName)
                .forEach(staticPrint);
    }
}

위의 코드를 예시로 들면

method reference: target::compareTo

static method reference: Main::print

instance method reference: Student::getName

constructor method reference: Student::new

이렇게 해당된다.

 

ExecutorService

쓰레드 풀을 이용하여 비동기적으로 작업을 실행하고 관리해준다.

쓰레드를 생성하고 관리하는 작업이 필요하지 않기 때문에, 코드를 간결하게 유지가 가능하다.

public interface ExecutorService extends Executor {
    void shutdown();

    <T> Future<T> submit(Callable<T> task);

    void execute(Runnable command);
}

이렇게 구성이 되어 있으며 각 메서드는 다음과 같이 동작한다.

execute: Runnable 인터페이스를 구현한 작업을 쓰레드 풀의 쓰레드에서 비동기적으로 실행한다.

submit: Callable 인터페이스를 구현한 작업을 쓰레드 풀에서 비동기적으로 실행하고, 해당 작업의 결과를 Future<T> 객체로 반환한다.

shutdown: ExecutorService를 종료하며, 더 이상의 task를 받지 않는다.

 

Executors를 사용하여 ExecutorService를 생성한다.

  • newSingleThreadExecutor: 단일 쓰레드로 구성된 쓰레드 풀을 생성, 한 번에 하나의 작업만 실행
  • newFixedThreadPool: 고정된 크기의 쓰레드 풀을 생성. 크기는 인자로 주어진 n과 동일
  • newCachedThreadPool: 사용가능한 쓰레드가 없다면 생성, 있다면 재사용하며 쓰레드가 일정시간 사용되지 않으면 회수
  • newScheduledThreadPool: 스케줄링 기능을 갖춘 고정 크기의 쓰레드 풀을 생성. 주기적이거나 지연이 발생하는 작업을 실행
  • newWorkStealingPool: work steal 알고리즘을 사용하는 ForkJoinPool을 생성

 

 

Future

이제 Future 인터페이스에 대하여 자세히 살펴보자.

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Future 인터페이스는 이렇게 구성되어 있다.

 

  • isDone: task가 완료되었다면, true 반환
  • isCancelled: task가 cancel에 의해 취소된 경우, true 반환
  • get: 결과를 구할 때까지 thread가 계속 block(future가 오래 걸린다면 thread가 blocking 상태 유지), 이 문제를 해결하기 위해 timeout의 인자를 받는 메서드가 존재
  • cancel: future의 작업을 취소하며, 취소할 수 없는 상황이면 false를 반환한다. mayInterruptIfRunning이 false라면 시작하지 않은 작업만 취소

 

이렇게 Future에 대해서 알아보았는 데, cancel로 정지시키는 거 말고는 future를 컨트롤 할 수가 없다.

또한 반환된 결과를 get으로 기다린 후 접근하기 때문에 비동기로 작업하기가 어렵다.

728x90

https://seungkyu-han.tistory.com/109

 

동기, 비동기란 무엇일까?

스프링을 공부하기 전에 항상 나오는 말이 있다. 스프링은 동기, node 서버는 비동기라는 말을 굉장히 많이 들었던 것 같다. 처음에 공부할 때는 외우고만 있다가 해당 내용을 운영체제에서 공부

seungkyu-han.tistory.com

저번에 작성한 동기, 비동기와 연결이 되는 글이다.

 

앞서 작성했던 프로그램 A와 프로그램 B의 공통점은 둘 다 호출하는 main 함수가 getValue 함수가 시행되는 동안 Blocking 상태가 된다는 것이다.

각각 프로그램이 실행되는 모습을 그려보면 다음과 같다.

프로그램 A
프로그램 B

 

모두 메인 함수가 Blocking 되는 것을 볼 수 있다.

그렇기 때문에 프로그램 A는 동기이면서 Blocking, 프로그램 B는 비동기이면서 Non-blocking 프로그램이다.

 

그럼 과연 이런 모델들만 있을까?

물론 아니다.

다음 프로그램들을 보자.

package org.example;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ProgramC {

    public static void myLog(String string){
        System.out.println(Thread.currentThread() + " -> " +string);
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        myLog("Start main");

        var count = 1;
        Future<Integer> result = getValue();
        while (!result.isDone()){
            myLog("Waiting for result, count: " + count++);
            Thread.sleep(1000);
        }

        var nextValue = result.get() + 1;
        myLog(String.valueOf(nextValue == 1));

        myLog("Finish main");
    }

    public static Future<Integer> getValue(){
        var executor = Executors.newSingleThreadExecutor();
        try{
            return executor.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    myLog("Start getValue");
                    try{
                        Thread.sleep(1000);
                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }

                    var result = 0;
                    try {
                        return result;
                    }finally {
                        myLog("Finish getResult");
                    }
                }
            });
        }finally {
            executor.shutdown();
        }
    }
}

 

해당 코드에서는 준비가 되기 전까지 계속 메인에서 코드를 실행하게 된다.

 

그리고 결과를 보면 메인 스레드 외에도 다른 스레드가 실행되는 것을 볼 수 있다.

 

프로그램 C

메인 스레드는 getValue 스레드가 동작하는 중간에도 계속 동작하는 것을 볼 수 있다.

 

해당 프로그램은 동기이면서 Non-blocking 프로그램이다.

 

마지막으로 비동기이면서 Non-blocking 프로그램은 다음과 같을 것이다.

caller 스레드는 반환값이 필요가 없기에 바로 종료될 것이며, callee 본인의 일만 본인의 스레드에서 하게 될 것이다.

package org.example;

import java.util.concurrent.Executors;

public class ProgramD {
    public static void myLog(String string){
        System.out.println(Thread.currentThread() + " -> " +string);
    }

    public static void main(String[] args) {
        myLog("Start main");
        getValue(new Function<Integer>() {
            @Override
            public void accept(Integer integer) {
                var nextValue = integer + 1;
                myLog(String.valueOf(nextValue == 1));
            }
        });
        myLog("Finish main");
    }

    public static void getValue(Function<Integer> callback){
        var executor = Executors.newSingleThreadExecutor();
        try{
            executor.submit(new Runnable() {
                @Override
                public void run() {
                    myLog("Start getValue");
                    try{
                        Thread.sleep(1000);
                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }

                    var value = 0;
                    try{
                        callback.accept(value);
                    }finally {
                        myLog("Finish getResult");
                    }
                }
            });
        }finally {
            executor.shutdown();
        }
    }
}

 

이렇게 메인 스레드는 시작하자마자 종료가 되고, 남은 스레드 혼자 실행이 되고 있는 것을 볼 수가 있다.

+ Recent posts