728x90

지금까지 배운 것을 바탕으로 성능을 개선해보자.

 

동기적으로 작성된 코드를 비동기적으로 변경하는 것이다.

현재 동기적으로 작성된 코드는 다음과 같다.

public Optional<User> getUserById(String id){
        return userRepository.findById(id)
                .map(user -> {
                    var image = imageRepository.findById(user.getProfileImageId())
                            .map(imageEntity -> {
                                return new Image(imageEntity.getId(), imageEntity.getName(), imageEntity.getUrl());
                            });

                    var articles = articleRepository.findAllByUserId(user.getId())
                            .stream().map(articleEntity ->
                                    new Article(articleEntity.getId(), articleEntity.getTitle(), articleEntity.getContent())).toList();

                    var followCount = followRepository.countByUserID(user.getId());

                    return new User(
                            user.getId(),
                            user.getName(),
                            user.getAge(),
                            image,
                            articles,
                            followCount
                    );
                });
    }

 

Repository에서 가져올 때마다 1초 정도 시간이 걸린다고 생각하여 1초 정도 Thread.sleep을 사용했고, 조회한 값들을 다른 객체에 넣어서 반환하는 메서드이다.

 

차례대로 1초씩 3번 호출하기 때문에 적어도 3초 이상의 시간이 소모될 것이다.

 

    void testGetUser(){
        //given
        String userId = "1234";

        //when
        Optional<User> optionalUser = userBlockingService.getUserById(userId);

        //then
        assertFalse(optionalUser.isEmpty());
        var user = optionalUser.get();
        assertEquals(user.getName(), "sk");

        assertFalse(user.getProfileImage().isEmpty());
        assertFalse(user.getProfileImage().isEmpty());
        var image = user.getProfileImage().get();
        assertEquals(image.getId(), "image#1000");
        assertEquals(image.getName(), "profileImage");
        assertEquals(image.getUrl(), "https://avatars.githubusercontent.com/u/98071131?s=400&u=9107a0b50b52da5bbc8528157eed1cca34feb3c5&v=4");

        assertEquals(2, user.getArticleList().size());

        assertEquals(1000, user.getFollowCount());
    }

 

해당 테스트 코드의 시간을 확인 했을 때 4sec 74ms가 소모되었다.

 

이제 변경해보도록 하자.

일단 각 Repository를 CompletableFuture을 반환하도록 메서드를 변경했다.

    @SneakyThrows
    public CompletableFuture<Optional<UserEntity>> findById(String userId){
        return CompletableFuture.supplyAsync(() -> {
            log.info("UserRepository.findById: {}", userId);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            var user = userMap.get(userId);
            return Optional.ofNullable(user);
        });
    }

 

 

현재 에러만 잡아서 변경해놓은 코드는 다음과 같다.

    @SneakyThrows
    public Optional<User> getUserById(String id){
        return userRepository.findById(id).get()
                .map(this::getUser);
    }

    @SneakyThrows
    private User getUser(UserEntity user){
        var image = imageRepository.findById(user.getProfileImageId()).get()
                .map(imageEntity -> {
                    return new Image(imageEntity.getId(), imageEntity.getName(), imageEntity.getUrl());
                });

        var articles = articleRepository.findAllByUserId(user.getId()).get()
                .stream().map(articleEntity ->
                        new Article(articleEntity.getId(), articleEntity.getTitle(), articleEntity.getContent())).toList();

        var followCount = followRepository.countByUserID(user.getId()).get();

        return new User(
                user.getId(),
                user.getName(),
                user.getAge(),
                image,
                articles,
                followCount
        );
    }

 

이거를 Repository에 접근 할 때마다 CompletableFuture를 사용했다.

 

    @SneakyThrows
    public CompletableFuture<Optional<User>> getUserById(String id){
        return userRepository.findById(id)
                .thenCompose(this::getUser);
    }

    @SneakyThrows
    private CompletableFuture<Optional<User>> getUser(Optional<UserEntity> userEntityOptional) {
        if (userEntityOptional.isEmpty()) {
            return CompletableFuture.completedFuture(Optional.empty());
        }

        var userEntity = userEntityOptional.get();

        var imageFuture = imageRepository.findById(userEntity.getProfileImageId())
                .thenApplyAsync(imageEntityOptional ->
                        imageEntityOptional.map(imageEntity ->
                                new Image(imageEntity.getId(), imageEntity.getName(), imageEntity.getUrl())
                        )
                );


        var articlesFuture = articleRepository.findAllByUserId(userEntity.getId())
                .thenApplyAsync(articleEntities ->
                        articleEntities.stream()
                                .map(articleEntity ->
                                        new Article(articleEntity.getId(), articleEntity.getTitle(), articleEntity.getContent())
                                )
                                .collect(Collectors.toList())
                );

        var followCountFuture = followRepository.countByUserID(userEntity.getId());

        return CompletableFuture.allOf(imageFuture, articlesFuture, followCountFuture)
                .thenAcceptAsync(v -> {
                    log.info("Three futures are completed");
                })
                .thenRunAsync(() -> {
                    log.info("Three futures are also completed");
                })
                .thenApplyAsync(v -> {
                    try {
                        var image = imageFuture.get();
                        var articles = articlesFuture.get();
                        var followCount = followCountFuture.get();

                        return Optional.of(
                                new User(
                                        userEntity.getId(),
                                        userEntity.getName(),
                                        userEntity.getAge(),
                                        image,
                                        articles,
                                        followCount
                                )
                        );
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
    }
}

 

이렇게 변경하고 전에 사용했던 테스트 코드의 시간을 측정해보았다.

 

거의 반으로 줄어든 2sec 66ms가 나왔다.

 

전 코드에서 Repository에 접근 하는 것을 기다리기 보다 비동기적으로 실행한다면 시간을 크게 줄일 수 있는 것을 볼 수 있었던 것 같다.

'백엔드 > 리액티브 프로그래밍' 카테고리의 다른 글

ColdPublisher 구현  (0) 2024.03.13
Publisher, Subscriber에 대하여  (0) 2024.03.12
CompletableFuture 인터페이스  (1) 2024.03.06
CompletionStage 인터페이스  (1) 2024.03.05
Future 인터페이스  (1) 2024.01.09
728x90

CompletableFuture 클래스

우선 주요 메서드 먼저 살펴보고 가도록 하자.

public class CompletableFuture<T> implements Future<T>, CompletionStage<T>{
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(ASYNC_POOL, supplier);
    }

	public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(ASYNC_POOL, runnable);
    }
    
    public boolean complete(T value) {
        boolean triggered = completeValue(value);
        postComplete();
        return triggered;
    }
    
    public boolean isCompletedExceptionally() {
        Object r;
        return ((r = result) instanceof AltResult) && r != NIL;
    }
    
    public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
        return andTree(cfs, 0, cfs.length - 1);
    }
    
    public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
        int n; Object r;
        if ((n = cfs.length) <= 1)
            return (n == 0)
                ? new CompletableFuture<Object>()
                : uniCopyStage(cfs[0]);
        for (CompletableFuture<?> cf : cfs)
            if ((r = cf.result) != null)
                return new CompletableFuture<Object>(encodeRelay(r));
        cfs = cfs.clone();
        CompletableFuture<Object> d = new CompletableFuture<>();
        for (CompletableFuture<?> cf : cfs)
            cf.unipush(new AnyOf(d, cf, cfs));
        // If d was completed while we were adding completions, we should
        // clean the stack of any sources that may have had completions
        // pushed on their stack after d was completed.
        if (d.result != null)
            for (int i = 0, len = cfs.length; i < len; i++)
                if (cfs[i].result != null)
                    for (i++; i < len; i++)
                        if (cfs[i].result == null)
                            cfs[i].cleanStack();
        return d;
    }
}

 

 

supplyAsync

 

    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(ASYNC_POOL, supplier);
    }

 

이렇게 구성이 되어 있었다.

 

보면 알 수 있듯이, 파라미터를 받지 않고도 결과를 만들어서 다음 task에 전달해준다.

 

runAsync

 

	public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(ASYNC_POOL, runnable);
    }

 

Runnable과 비슷하다.

값을 받지도, 값을 리턴하지도 않고 수행만 하게 된다.

 

complete

 

CompletableFuture가 완료되지 않았다면 주어진 값으로 채운다.

리턴되는 Boolean은 complete에 의해 상태가 바뀌었다면 true, 아니라면 false를 반환한다.

    public boolean complete(T value) {
        boolean triggered = completeValue(value);
        postComplete();
        return triggered;
    }

 

isCompletedExceptionally

 

CompletableFuture가 에러로 인해 중지가 되었는지 Boolean으로 반환하는 메서드이다.

var futureWithException = CompletableFuture.supplyAsync(() -> 1 / 0);

Thread.sleep(1000);

assert futureWithException.isDone();
assert futureWithException.isCompletedExceptionally();

 

이런식의 코드를 작성하여 확인 할 수 있다.

 

allOf

 

여러개의 CompletableFuture를 모아서 하나의 CompletableFuture로 변환할 수 있다.

모든 CompletableFuture가 완료되면 상태가 done으로 변경된다.

반환하는 값은 없기 때문에 각각의 값에 다시 접근하여 get으로 값을 가져와야 한다.

 

allOf를 테스트 해보기 위해 코드를 작성해서 확인해보자.

@Slf4j
public class A {

    @SneakyThrows
    public static void main(String[] args) {

        var startTime = System.currentTimeMillis();

        var firstFuture = waitAndReturn(100, 1);
        var secondFuture = waitAndReturn(500, 2);
        var thirdFuture = waitAndReturn(1000, 3);

        CompletableFuture.allOf(firstFuture, secondFuture, thirdFuture)
                .thenAcceptAsync(v -> {
                    try{
                        log.info("first: {}", firstFuture.get());
                        log.info("second: {}", secondFuture.get());
                        log.info("third: {}", thirdFuture.get());
                    }catch (Exception e){
                        throw new RuntimeException(e);
                    }
                }).join();

        var endTime = System.currentTimeMillis();

        log.info("time: {}", endTime - startTime);
    }

    public static CompletableFuture<Integer> waitAndReturn(int time, int value) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(time);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return value;
        });
    }
}

 

해당 코드를 실행하면 다음과 같이 나온다.

 

차례차례 실행이 된 100 + 500 + 1000이 아닌 1000에 가까운 값이 나오는 것을 볼 수 있다.

 

anyOf

 

allOf와는 다르게 가장 먼저 끝난 Future의 값을 제공해준다.

 

방금과 비슷한 코드를 실행해보면

@Slf4j
public class A {

    @SneakyThrows
    public static void main(String[] args) {

        var startTime = System.currentTimeMillis();

        var firstFuture = waitAndReturn(100, 1);
        var secondFuture = waitAndReturn(500, 2);
        var thirdFuture = waitAndReturn(1000, 3);

        CompletableFuture.anyOf(firstFuture, secondFuture, thirdFuture)
                .thenAcceptAsync(v -> {
                    try{
                        log.info("Hi FirstValue");
                        log.info("value: {}", v);
                    }catch (Exception e){
                        throw new RuntimeException(e);
                    }
                }).join();

        var endTime = System.currentTimeMillis();

        log.info("time: {}", endTime - startTime);
    }

    public static CompletableFuture<Integer> waitAndReturn(int time, int value) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(time);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return value;
        });
    }
}

 

 

가장 빨리 실행이 되는 Future만 가져오는 것을 볼 수 있다.

 

 

+ Recent posts