Hot / Cold Publisher 에 대해 가볍게 정리한 글입니다.

들어가면서

처음 리액티브 프로그래밍/리액터를 공부하게 되면 보통 아래와 같은 코드를 자주 작성하게 됩니다.

  • 1 ~ 10 까지의 데이터를 처음부터 끝까지 출력하는 예제 코드
JAVA
Flux.range(1, 10)
    .doOnNext(i -> System.out.println("# emit: " + i))
    .subscribe();

이러한 코드는 ** 언제, 몇 번을 구독하든 상관없이, 구독할 때마다 1부터 10까지 같은 값들을 처음부터 끝까지 다시 흘려보냅니다.** 그래서 리액터를 처음 접할 때는, 자연스럽게 “구독만 하면 각 구독자가 자기 차례로 처음부터 끝까지 같은 데이터를 받는다” 는 식의 동작만을 떠올리기 쉽습니다.

만약 항상 현재 시각을 보여줘야 하는 디지털 시계를 리액터로 구현하려면 어떻게 해야 할까요? 이러한 ** 디지털 시계는 위의 예제처럼 매번 같은 데이터를 처음부터 끝까지 다시 보여줄 필요는 없습니다.** 사용자가 언제 화면을 열든 그 순간의 현재 시각만 계속 갱신해서 보여주면 충분하죠. 구독할 때마다 1부터 10까지를 처음부터 다시 흘려보내 주는 Flux.range(1, 10)과 달리, 이런 경우에는 ** 이미 흘러가고 있는 “현재 시각 스트림”에 구독자가 중간부터 합류하는 쪽이 훨씬 자연스러운 모델입니다.** 아래는 이런 디지털 시계를 리액터로 표현한 간단한 예제입니다.

  • ** 현재 시각을 출력하는 디지털 시계 예제**
    JAVA
    DateTimeFormatter formatter =
            DateTimeFormatter.ofPattern("HH:mm:ss");
    
    // 1초마다 현재 시각을 흘려보내는 "공유되는" Hot 스트림
    Flux<String> clock = Flux.interval(Duration.ofSeconds(1))
            .map(tick -> LocalDateTime.now().format(formatter))
            .share(); // 여러 구독자가 같은 흐름을 공유
    
    // 구독자 A: 바로 구독 시작
    clock.subscribe(time -> System.out.println("A: " + time));
    
    // 3초 뒤에 구독자 B가 중간부터 합류
    Thread.sleep(3000);
    clock.subscribe(time -> System.out.println("B: " + time));
    
    // 메인 스레드를 일정 시간 동안 유지
    Thread.sleep(5000);
    

위 예제에서 share()는 돌아가는 스트림 하나를 여럿이 같이 볼 수 있도록 하는 연산자 입니다. 이제 리액터의 Publisher“항상 처음부터 데이터를 흘려보내는 쪽”“이미 흘러가고 있는 흐름에 중간부터 합류하는 쪽” 으로 나눠서 살펴보겠습니다.

항상 처음부터 vs 중간부터 합류

리액티브 프로그래밍/리액터는 ** 항상 처음부터 데이터를 흘려보내는 Publisher와, 이미 흘러가고 있는 흐름에 중간부터 합류하는 Publisher**를 아래와 같이 정의해 구분합니다.

  • ** 항상 처음부터** 데이터를 흘려보내는 Publisher : Cold Publisher
  • ** 이미 흘러가고 있는 흐름에 중간부터 합류 **하는 Publisher : Hot Publisher

Hot Publisher 와 Cold Publisher 에 대해서 말하기 전에 Hot / Cold 가 어떠한 경우에 사용되는 용어인지 를 먼저 정리해 보겠습니다.


Hot / Cold 용어 정리하기

사실 Hot / Cold 라는 표현은 생각보다 여러 곳에서 쓰입니다. 그중 개발자가 가장 자주 마주치는 예는 바로 캐시(cache) 입니다.

용어의미 설명
Cold 캐시** 캐시가 비어 있거나, 아직 유의미한 데이터가 거의 없는 상태.** 요청이 들어와야 DB·외부 API를 호출해 캐시를 채우기 시작하는, ** 아직 준비가 덜 된 상태.**
Hot 캐시** 자주 사용하는 데이터가 이미 캐시에 충분히 올라와 있어서, 대부분의 요청이 캐시만 보고 바로 처리되는, 잘 달궈져 있는 상태.**

정리해 보면, Cold는 “요청이 들어와야 비로소 채워지는, 아직 준비가 덜 된 상태”, Hot은 “이미 충분히 달궈져 있어서, 들어오는 대로 바로바로 처리할 수 있는 상태” 를 가리킨다고 볼 수 있습니다. 리액터에서 말하는 Cold / Hot Publisher 도 그대로 이해하면됩니다. 이제부터는 이걸 “항상 처음부터 흘려보내는 스트림”과 “이미 흘러가고 있는 스트림” 관점에서 한 번 살펴보겠습니다.


Cold Publisher

Cold Publisher는 ** 요청이 들어와야 비로소 데이터를 흘려보내는** 스트림을 의미합니다. ** 즉, 구독자가 구독하기 전까지는 데이터가 흘러가지 않는 스트림 **을 의미합니다.

  • Cold Publisher 예제를 통해 이해하기
    JAVA
    Flux<Integer> flux =
        Flux.range(1, 3)
            .doOnNext(i -> System.out.println("# emit: " + i));
    
    flux.subscribe(i -> System.out.println("# A: " + i));
    System.out.println("----");
    flux.subscribe(i -> System.out.println("# B: " + i));
    

위의 예제를 실행해보면 알 수 있듯이 AB 모두 1부터 3까지를 각자 처음부터 다시 받습니다. 그렇다면 Cold Publisher 는 필요할 때마다 처음부터 다시 재생하는 VOD 영상에 더 가깝다 고 볼 수 있습니다.


Hot Publisher

Hot Publisher이미 흘러가고 있는 흐름에 중간부터 합류 하는 스트림을 의미합니다. 즉, 구독자가 구독하기 전에도 데이터가 흘러가고 있는 스트림 을 의미합니다.

  • Hot Publisher 예제를 통해 이해하기
    JAVA
    Flux<Long> flux =
        Flux.interval(Duration.ofSeconds(1))
            .doOnNext(i -> System.out.println("# emit: " + i))
            .share(); // 여러 구독자가 같은 흐름을 공유
    
    flux.subscribe(i -> System.out.println("A: " + i));
    
    Thread.sleep(3000);
    
    flux.subscribe(i -> System.out.println("B: " + i));
    

이 코드는 1초마다 숫자를 증가시키면서 계속 흘려보내는 스트림을 만든 뒤, share()를 통해 여러 구독자가 같은 흐름을 공유 하게 만듭니다.

  • 코드 실행 결과
    SHELL
    emit: 0
    A: 0
    
    emit: 1
    A: 1
    
    emit: 2
    A: 2
    
    emit: 3
    A: 3
    B: 3
    
    emit: 4
    A: 4
    B: 4
    

위 실행 결과를 보면 알 수 있듯이, 구독자 A는 스트림이 시작될 때부터 0, 1, 2, 3 … 을 순서대로 모두 보지만, 구독자 B는 3초 뒤에 구독을 시작했기 때문에 0, 1, 2는 전혀 보지 못하고 3부터 볼 수 있습니다. 이렇게 Hot Publisher 는 “필요할 때마다 처음부터 다시 재생하는 VOD”라기보다는, ** 이미 방송 중인 라이브 스트림에 중간부터 채널을 맞추는 것 **에 더 가깝다고 이해할 수 있습니다.

마무리하기

  • Cold Publisher: 구독자가 구독하기 전까지는 데이터가 흘러가지 않는 스트림
  • Hot Publisher: 구독자가 구독하기 전에도 데이터가 흘러가고 있는 스트림

언제 사용하면 좋을까 ?

용어언제 사용할까 ?실상황 예시
Cold Publisher각 구독자가 ** 처음부터 모든 데이터를 다시 받아야 할 때**Flux.range, Flux.fromIterable, Mono.fromCallable 같은 요청‑응답형 API 호출, DB 조회, 파일 읽기처럼 호출할 때마다 새로 수행되는 작업
Hot Publisher이미 발생 중인 이벤트를 여러 구독자가 ** 현재 시점부터 같이 바라보면 될 때**Flux.interval().share(), Sinks.many().multicast(), WebSocket 브로드캐스트, UI 이벤트 스트림처럼 실시간으로 흘러가는 이벤트 스트림

주의할 점

1. Hot Publisher에 늦게 구독하면 이전 데이터를 받지 못한다

Sinks.many().multicast()는 구독 이후의 데이터만 전달한다. 이전 데이터가 필요하면 replay()를 사용해야 한다.

2. Cold Publisher를 여러 번 구독하면 소스가 여러 번 실행된다

HTTP 호출이나 DB 쿼리를 Cold Publisher로 감싸면, 구독할 때마다 호출이 반복된다. cache() 연산자로 결과를 캐싱해야 한다.

3. share()와 replay()의 차이를 혼동하면 데이터 유실이 발생한다

share()는 첫 구독자부터 데이터를 흘리고 늦은 구독자는 이후 데이터만 받는다. replay()는 지정한 수만큼 이전 데이터를 재생해준다.

댓글 로딩 중...