728x90

그 다음은 HotPublisher이다.

 

소비자와 관계 없이 데이터를 한 번만 통지하는 것이 HotPublisher이다.

해당하는 데이터는 한 번만 통지하고, 소비자는 구독한 시점 이후에 통지된 데이터들만 전달 받을 수 있다.

 

바로 작성해보도록 하자.

Subscriber는 변함 없이 ColdPublisher에서 사용했던 코드를 그대로 사용한다.

 

  • Publisher

Cold와는 지속적으로 데이터를 전달한다.

@Slf4j
public class SimpleHotPublisher implements Flow.Publisher<Integer>{

    private final ExecutorService publisherExecutor = Executors.newSingleThreadExecutor();
    private final Future<Void> task;
    private List<Integer> numbers = new ArrayList<>();
    private List<SimpleHotSubscription> subscriptions = new ArrayList<>();

    public SimpleHotPublisher(){
        numbers.add(1);
        task = publisherExecutor.submit(() -> {
            for(int i = 2; !Thread.interrupted(); i++){
                numbers.add(i);
                subscriptions.forEach(SimpleHotSubscription::wakeup);
                Thread.sleep(100);
            }
            return null;
        });
    }

    public void shutdown(){
        this.task.cancel(true);
        publisherExecutor.shutdown();
    }

    @Override
    public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
        var subscription = new SimpleHotSubscription(subscriber);
        subscriber.onSubscribe(subscription);
        subscriptions.add(subscription);
    }

    private class SimpleHotSubscription implements Flow.Subscription{
        private int offset;
        private int requiredOffset;
        private final Flow.Subscriber<? super Integer> subscriber;
        private final ExecutorService subscriptionExecutorService = Executors.newSingleThreadExecutor();

        public SimpleHotSubscription(Flow.Subscriber<? super Integer> subscriber){
            int lastElementIndex = numbers.size() - 1;
            this.offset = lastElementIndex;
            this.requiredOffset = lastElementIndex;
            this.subscriber = subscriber;
        }

        @Override
        public void request(long n) {
            requiredOffset += n;

            onNextWhilePossible();
        }

        @Override
        public void cancel() {
            this.subscriber.onComplete();
            if(subscriptions.contains(this)){
                subscriptions.remove(this);
            }
            subscriptionExecutorService.shutdown();
        }

        public void wakeup(){
            onNextWhilePossible();
        }

        private void onNextWhilePossible(){
            subscriptionExecutorService.submit(() -> {
                while (offset < requiredOffset && offset < numbers.size()){
                    var item = numbers.get(offset);
                    subscriber.onNext(item);
                    offset++;
                }
            });
        }

    }
}

 

해당 코드를 실행해보자.

 

 

이렇게 시점에 따라 전달받는 데이터가 다른 것을 볼 수 있다.

 

 

'Spring > 리액티브 프로그래밍' 카테고리의 다른 글

RxJava Reactor  (2) 2024.03.14
Project reactor  (0) 2024.03.14
ColdPublisher 구현  (0) 2024.03.13
Publisher, Subscriber에 대하여  (0) 2024.03.12
CompletableFuture를 사용한 성능튜닝  (0) 2024.03.06
728x90

Publisher 중에서 ColdPublisher를 구현해보자.

 

ColdPublisher에 대해 먼저 알아보자.

 

소비자가 구독 할 때마다 데이터를 처음부터 새로 통지하는 것을 ColdPublisher라고 한다.

소비자는 구독 시점과 상관 없이 통지된 데이터를 처음부터 전달 받게 된다.

 

모든 소비자가 같은 데이터를 전달받게 될 것이다.

 

간단하게 한 번 구현해보자.

 

  • Publisher, Subscription

우선 Publisher와 Subscription이다.

public class SimpleColdPublisher implements Flow.Publisher<Integer>{

    @Override
    public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
        var iterator = Collections.synchronizedList(
                IntStream.range(1, 10).boxed().collect(Collectors.toList())
        ).iterator();
        var subscription = new SimpleColdSubscription(iterator, subscriber);
        subscriber.onSubscribe(subscription);
    }

    public class SimpleColdSubscription implements Flow.Subscription{

        private final Iterator<Integer> iterator;
        private final Flow.Subscriber<? super Integer> subscriber;
        private final ExecutorService executor = Executors.newSingleThreadExecutor();

        public SimpleColdSubscription(Iterator<Integer> iterator, Flow.Subscriber<? super Integer> subscriber) {
            this.iterator = iterator;
            this.subscriber = subscriber;
        }

        @Override
        public void request(long n) {
            executor.submit(() -> {
               for(int i = 0; i < n; i++){
                   if(iterator.hasNext()){
                       var number = iterator.next();
                       iterator.remove();
                       subscriber.onNext(number);
                   }
                   else{
                       subscriber.onComplete();
                       executor.shutdown();
                       break;
                   }
               }
            });
        }

        @Override
        public void cancel() {
            subscriber.onComplete();
        }
    }
}

 

ColdPublisher답게 전달할 데이터의 배열을 넘겨주게 된다.

 

해당 iterator의 데이터를 전부 전달해주고 종료된다.

 

 

  • Subscriber

그 다음은 Subscriber이다.

subscription을 인자로 받는 onSubscribe가 있으며, onNext를 호출 할 때마다 request로 데이터를 하나씩 받아온다.

 

@Slf4j
public class SimpleNamedSubscriber<T> implements Flow.Subscriber<T> {

    private Flow.Subscription subscription;
    private final String name;

    public SimpleNamedSubscriber(String name){
        this.name = name;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1);
        log.info("onSubscribe");
    }

    @Override
    public void onNext(T item) {
        log.info("name: {}, onNext: {}", name, item);
        this.subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        log.error("onError: {}", throwable.getMessage());
    }

    @Override
    public void onComplete() {
        log.info("onComplete");
    }
    
    public void cancel(){
        log.info("cancel");
        this.subscription.cancel();
    }
}

 

 

해당 코드를 실행해보자.

public class SimpleColdPublisherMain {

    @SneakyThrows
    public static void main(String[] args) {
        //create Publisher
        var publisher = new SimpleColdPublisher();

        var subscriber1 = new SimpleNamedSubscriber<Integer>("subscriber1");
        publisher.subscribe(subscriber1);

        Thread.sleep(5000);

        var subscriber2 = new SimpleNamedSubscriber<Integer>("subscriber2");
        publisher.subscribe(subscriber2);
    }
}

 

다음과 같은 코드로 Publisher와 Subscriber를 테스트 해보았다.

 

 

이렇게 subscriber1, subscriber2가 다른 시간에 subscribe를 했음에도 같은 결과가 나오는 것을 볼 수 있다.

'Spring > 리액티브 프로그래밍' 카테고리의 다른 글

Project reactor  (0) 2024.03.14
HotPublisher 구현  (0) 2024.03.13
Publisher, Subscriber에 대하여  (0) 2024.03.12
CompletableFuture를 사용한 성능튜닝  (0) 2024.03.06
CompletableFuture 인터페이스  (1) 2024.03.06
728x90

이번에 리액티브 프로그래밍을 공부하다가, 이런 용어들이 나왔다.

중간에 흐름을 조절해 준다는 역할을 한다고 하는데, 이해가 잘 되지 않아 일단 정리해 보고 넘어가려 한다.

 

 

우선 이해하기 쉽게 그림이다.

 

  • Publisher

Publisher가 Subscriber를 Subscribe한다.

Publisher는 request를 받으면 데이터를 생성하여 보낸다.

  • Subscriber

Subscriber가 Subscription을 onSubscribe 한다.

Subscriber는 필요할 때 Subscribe의 request를 통해 Publisher에게 데이터를 요청한다.

Subscriber는 onNext로 데이터를 받는다.

  • Subscription

Subscription은 Subscriber에 의해 등록된다.

 

 

모든 요청이 성공적으로 완료된다면 onComplete를, 요청이 실패하면 onError를 호출하고 흐름을 종료한다.

 

 

당연히 Publisher는 여러 개의 Subscriber를 Subscribe 가능하다.

 

하나씩 살펴보자

 

Publisher

@FunctionalInterface
public static interface Publisher<T> {
    public void subscribe(Subscriber<? super T> subscriber);
}

 

하나의 메서드 밖에 없다.

subscribe로 Subscriber를 등록하면 된다.

 

Subscription

public static interface Subscription {
    public void request(long n);

    public void cancel();
}

 

 

  • requst

Subscriber가 데이터를 처리 가능 할 때 request를 호출한다.

파라미터 n은 Publisher에게 요청하는 데이터의 개수이다.

  • cancel

Publisher에게 데이터를 그만 보내라고 요청하는 메서드이다.

 

Subscriber

public static interface Subscriber<T> {
    public void onSubscribe(Subscription subscription);
    
    public void onNext(T item);

    public void onError(Throwable throwable);

    public void onComplete();
}

 

  • onSubscribe

Subscription을 파라미터로 받아 request를 호출한다.

Subscription의 request를 호출하는 것은 온전히 Subscriber의 결정이며, 호출되기 전까지는 어떤 데이터도 흐르지 않는다.

  • onNext

Publisher가 보낸 데이터이다.

  • onError

에러로 종료

  • onComplete

성공적으로 종료

'Spring > 리액티브 프로그래밍' 카테고리의 다른 글

HotPublisher 구현  (0) 2024.03.13
ColdPublisher 구현  (0) 2024.03.13
CompletableFuture를 사용한 성능튜닝  (0) 2024.03.06
CompletableFuture 인터페이스  (1) 2024.03.06
CompletionStage 인터페이스  (1) 2024.03.05
728x90

https://www.acmicpc.net/problem/1253

 

1253번: 좋다

첫째 줄에는 수의 개수 N(1 ≤ N ≤ 2,000), 두 번째 줄에는 i번째 수를 나타내는 Ai가 N개 주어진다. (|Ai| ≤ 1,000,000,000, Ai는 정수)

www.acmicpc.net

공유기와 비슷하게 함수를 만들고 하나씩 돌아가면서 확인하도록 만들었다.

 

import sys

N = int(sys.stdin.readline().strip())
A = list(map(int, sys.stdin.readline().split()))

A.sort()

result = 0


def isGood(goal):
    left, right = 0, N - 1

    while left < right:
        if A[left] + A[right] == A[goal]:
            if left == goal:
                left += 1
            elif right == goal:
                right -= 1
            else:
                return True
        elif A[left] + A[right] > A[goal]:
            right -= 1
        elif A[left] + A[right] < A[goal]:
            left += 1


for i in range(N):
    result += 1 if isGood(i) is True else 0

print(result)

 

'알고리즘 > 이분탐색' 카테고리의 다른 글

백준 2110 공유기 설치 (Python)  (0) 2024.03.10
백준 1654 랜선 자르기 (Python)  (0) 2024.03.10
백준 2805 나무 자르기 (Python)  (0) 2024.03.10
728x90

https://www.acmicpc.net/problem/2110

 

2110번: 공유기 설치

첫째 줄에 집의 개수 N (2 ≤ N ≤ 200,000)과 공유기의 개수 C (2 ≤ C ≤ N)이 하나 이상의 빈 칸을 사이에 두고 주어진다. 둘째 줄부터 N개의 줄에는 집의 좌표를 나타내는 xi (0 ≤ xi ≤ 1,000,000,000)가

www.acmicpc.net

당연하게 이분 탐색으로 풀었다.

이분탐색을 푸는 루틴이 생긴 것 같다.

우선 함수를 만들고 그 함수를 기준으로 left 쪽으로 갈지, right 쪽으로 갈지 정하는 것이다.

 

이번 문제의 함수에서 반환하는 값은 공유기의 개수이다.

거리를 기준으로 공유기를 놓아보고, 만약 공유기가 더 필요하다면 거리를 늘리는 것이다.

 

import sys

N, M = map(int, sys.stdin.readline().split())

house = []

for i in range(N):
    house.append(int(sys.stdin.readline()))

house.sort()


def wifi(length):
    result = 1
    cur = house[0]
    for i in range(1, N):
        if house[i] >= cur + length:
            cur = house[i]
            result += 1

    return result


left = 1
right = house[-1] - house[0]

while left <= right:
    mid = (left + right) // 2
    cnt = wifi(mid)

    if cnt >= M:
        left = mid + 1
    else:
        right = mid - 1

print(right)

'알고리즘 > 이분탐색' 카테고리의 다른 글

백준 1253 좋다 (Python)  (0) 2024.03.10
백준 1654 랜선 자르기 (Python)  (0) 2024.03.10
백준 2805 나무 자르기 (Python)  (0) 2024.03.10
728x90

https://www.acmicpc.net/problem/1654

 

1654번: 랜선 자르기

첫째 줄에는 오영식이 이미 가지고 있는 랜선의 개수 K, 그리고 필요한 랜선의 개수 N이 입력된다. K는 1이상 10,000이하의 정수이고, N은 1이상 1,000,000이하의 정수이다. 그리고 항상 K ≦ N 이다. 그

www.acmicpc.net

전에 풀었던 나무 자르기와 비슷하다.

당연히 이분탐색으로 바로 풀었고, 결과를 구해주는 함수는 작성했다.

굳이 설명을 더 적지는 않도록 하겠다.

 

import sys

K, N = map(int, sys.stdin.readline().split())

lan = list()
max_lan = 0

for i in range(K):
    cur_lan = int(sys.stdin.readline().strip())
    lan.append(cur_lan)
    max_lan = max(max_lan, cur_lan)


def cur_lan(length):
    result = 0
    for lan_length in lan:
        result += (lan_length // length)
    return result


left, right = 0, max_lan

while left <= right:
    mid = (left + right) // 2
    mid = max(1, mid)
    lan_count = cur_lan(mid)

    if lan_count >= N:
        left = mid + 1
    else:
        right = mid - 1

print(right)

'알고리즘 > 이분탐색' 카테고리의 다른 글

백준 1253 좋다 (Python)  (0) 2024.03.10
백준 2110 공유기 설치 (Python)  (0) 2024.03.10
백준 2805 나무 자르기 (Python)  (0) 2024.03.10
728x90

 

import sys

N, M = map(int, sys.stdin.readline().split())

tree = list(map(int, sys.stdin.readline().split()))


def cut(height):
    result = 0
    for i in tree:
        result += max(0, i - height)
    return result


left, right = 0, max(tree)

while left <= right:
    mid = (left + right) // 2
    cur_height = cut(mid)

    if cur_height >= M:
        left = mid + 1
    else:
        right = mid - 1

print(right)

https://www.acmicpc.net/problem/2805

 

2805번: 나무 자르기

첫째 줄에 나무의 수 N과 상근이가 집으로 가져가려고 하는 나무의 길이 M이 주어진다. (1 ≤ N ≤ 1,000,000, 1 ≤ M ≤ 2,000,000,000) 둘째 줄에는 나무의 높이가 주어진다. 나무의 높이의 합은 항상 M보

www.acmicpc.net

백준 이분 탐색 중에 가장 많이 푼 문제 중 하나일 거 같다.

그냥 Left, Right를 잡고 범위를 좁혀나간 후 푸는 이분탐색 문제이다.

 

얻을 수 있는 나무의 길이를 구하기 위해 높이에 따라 얻는 나무를 구해주는 함수는 만들어서 사용했다.

 

 

 

'알고리즘 > 이분탐색' 카테고리의 다른 글

백준 1253 좋다 (Python)  (0) 2024.03.10
백준 2110 공유기 설치 (Python)  (0) 2024.03.10
백준 1654 랜선 자르기 (Python)  (0) 2024.03.10
728x90

지금까지 배운 것을 바탕으로 성능을 개선해보자.

 

동기적으로 작성된 코드를 비동기적으로 변경하는 것이다.

현재 동기적으로 작성된 코드는 다음과 같다.

public Optional<User> getUserById(String id){
        return userRepository.findById(id)
                .map(user -> {
                    var image = imageRepository.findById(user.getProfileImageId())
                            .map(imageEntity -> {
                                return new Image(imageEntity.getId(), imageEntity.getName(), imageEntity.getUrl());
                            });

                    var articles = articleRepository.findAllByUserId(user.getId())
                            .stream().map(articleEntity ->
                                    new Article(articleEntity.getId(), articleEntity.getTitle(), articleEntity.getContent())).toList();

                    var followCount = followRepository.countByUserID(user.getId());

                    return new User(
                            user.getId(),
                            user.getName(),
                            user.getAge(),
                            image,
                            articles,
                            followCount
                    );
                });
    }

 

Repository에서 가져올 때마다 1초 정도 시간이 걸린다고 생각하여 1초 정도 Thread.sleep을 사용했고, 조회한 값들을 다른 객체에 넣어서 반환하는 메서드이다.

 

차례대로 1초씩 3번 호출하기 때문에 적어도 3초 이상의 시간이 소모될 것이다.

 

    void testGetUser(){
        //given
        String userId = "1234";

        //when
        Optional<User> optionalUser = userBlockingService.getUserById(userId);

        //then
        assertFalse(optionalUser.isEmpty());
        var user = optionalUser.get();
        assertEquals(user.getName(), "sk");

        assertFalse(user.getProfileImage().isEmpty());
        assertFalse(user.getProfileImage().isEmpty());
        var image = user.getProfileImage().get();
        assertEquals(image.getId(), "image#1000");
        assertEquals(image.getName(), "profileImage");
        assertEquals(image.getUrl(), "https://avatars.githubusercontent.com/u/98071131?s=400&u=9107a0b50b52da5bbc8528157eed1cca34feb3c5&v=4");

        assertEquals(2, user.getArticleList().size());

        assertEquals(1000, user.getFollowCount());
    }

 

해당 테스트 코드의 시간을 확인 했을 때 4sec 74ms가 소모되었다.

 

이제 변경해보도록 하자.

일단 각 Repository를 CompletableFuture을 반환하도록 메서드를 변경했다.

    @SneakyThrows
    public CompletableFuture<Optional<UserEntity>> findById(String userId){
        return CompletableFuture.supplyAsync(() -> {
            log.info("UserRepository.findById: {}", userId);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            var user = userMap.get(userId);
            return Optional.ofNullable(user);
        });
    }

 

 

현재 에러만 잡아서 변경해놓은 코드는 다음과 같다.

    @SneakyThrows
    public Optional<User> getUserById(String id){
        return userRepository.findById(id).get()
                .map(this::getUser);
    }

    @SneakyThrows
    private User getUser(UserEntity user){
        var image = imageRepository.findById(user.getProfileImageId()).get()
                .map(imageEntity -> {
                    return new Image(imageEntity.getId(), imageEntity.getName(), imageEntity.getUrl());
                });

        var articles = articleRepository.findAllByUserId(user.getId()).get()
                .stream().map(articleEntity ->
                        new Article(articleEntity.getId(), articleEntity.getTitle(), articleEntity.getContent())).toList();

        var followCount = followRepository.countByUserID(user.getId()).get();

        return new User(
                user.getId(),
                user.getName(),
                user.getAge(),
                image,
                articles,
                followCount
        );
    }

 

이거를 Repository에 접근 할 때마다 CompletableFuture를 사용했다.

 

    @SneakyThrows
    public CompletableFuture<Optional<User>> getUserById(String id){
        return userRepository.findById(id)
                .thenCompose(this::getUser);
    }

    @SneakyThrows
    private CompletableFuture<Optional<User>> getUser(Optional<UserEntity> userEntityOptional) {
        if (userEntityOptional.isEmpty()) {
            return CompletableFuture.completedFuture(Optional.empty());
        }

        var userEntity = userEntityOptional.get();

        var imageFuture = imageRepository.findById(userEntity.getProfileImageId())
                .thenApplyAsync(imageEntityOptional ->
                        imageEntityOptional.map(imageEntity ->
                                new Image(imageEntity.getId(), imageEntity.getName(), imageEntity.getUrl())
                        )
                );


        var articlesFuture = articleRepository.findAllByUserId(userEntity.getId())
                .thenApplyAsync(articleEntities ->
                        articleEntities.stream()
                                .map(articleEntity ->
                                        new Article(articleEntity.getId(), articleEntity.getTitle(), articleEntity.getContent())
                                )
                                .collect(Collectors.toList())
                );

        var followCountFuture = followRepository.countByUserID(userEntity.getId());

        return CompletableFuture.allOf(imageFuture, articlesFuture, followCountFuture)
                .thenAcceptAsync(v -> {
                    log.info("Three futures are completed");
                })
                .thenRunAsync(() -> {
                    log.info("Three futures are also completed");
                })
                .thenApplyAsync(v -> {
                    try {
                        var image = imageFuture.get();
                        var articles = articlesFuture.get();
                        var followCount = followCountFuture.get();

                        return Optional.of(
                                new User(
                                        userEntity.getId(),
                                        userEntity.getName(),
                                        userEntity.getAge(),
                                        image,
                                        articles,
                                        followCount
                                )
                        );
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
    }
}

 

이렇게 변경하고 전에 사용했던 테스트 코드의 시간을 측정해보았다.

 

거의 반으로 줄어든 2sec 66ms가 나왔다.

 

전 코드에서 Repository에 접근 하는 것을 기다리기 보다 비동기적으로 실행한다면 시간을 크게 줄일 수 있는 것을 볼 수 있었던 것 같다.

'Spring > 리액티브 프로그래밍' 카테고리의 다른 글

ColdPublisher 구현  (0) 2024.03.13
Publisher, Subscriber에 대하여  (0) 2024.03.12
CompletableFuture 인터페이스  (1) 2024.03.06
CompletionStage 인터페이스  (1) 2024.03.05
Future 인터페이스  (1) 2024.01.09

+ Recent posts