리액티브 프로그래밍 — Reactor와 Reactive Streams 이해하기
만 명이 동시에 API를 호출하면, 스레드 만 개를 만들어야 할까? 전통적인 서블릿 모델은 요청 하나당 스레드 하나를 할당한다. 그런데 스레드 하나당 약 1MB의 메모리를 차지하고, 수가 늘어날수록 컨텍스트 스위칭 비용이 급증한다. 리액티브 프로그래밍은 적은 스레드로 많은 요청을 처리 하는 패러다임이다.
▸ TIP 이 글의 코드 예제를 직접 실행해보고 싶다면 Java 기본기 핸드북을 확인해보세요.
1. 리액티브가 왜 필요한가
동기 블로킹의 한계
[요청 1] → [스레드 1] → DB 쿼리 (200ms 대기) → 응답
[요청 2] → [스레드 2] → 외부 API (500ms 대기) → 응답
...
[요청 201] → 스레드 풀 고갈 → 대기열에서 기다림
Tomcat의 기본 스레드 풀은 200개예요. 201번째 요청부터는 앞선 요청이 끝나기를 기다려야 합니다. 스레드가 실제로 CPU를 쓰는 시간은 얼마 안 되는데, 대부분 I/O 대기 상태로 놀고 있어요.
thread-per-request 모델의 문제
- ** 메모리 **: 스레드 하나당 약 1MB의 스택 메모리. 1,000개면 1GB
- ** 컨텍스트 스위칭 **: 스레드가 많아지면 OS의 전환 비용이 급격히 증가
- ** 확장성 한계 **: 동시 접속자가 수만 명이면 스레드를 수만 개 만들 수 없습니다
리액티브 프로그래밍은 ** 적은 수의 스레드로 많은 요청을 처리 **하는 방식입니다. I/O 대기 동안 스레드를 점유하지 않고, 데이터가 준비되면 콜백으로 이어서 처리해요.
2. Reactive Streams 스펙
리액티브 프로그래밍의 표준 인터페이스가 Reactive Streams 입니다. Java 9에서 java.util.concurrent.Flow로 편입되었고, 핵심은 네 개의 인터페이스예요.
public interface Publisher<T> {
void subscribe(Subscriber<? super T> s); // 구독 등록
}
public interface Subscriber<T> {
void onSubscribe(Subscription s); // 구독 시작
void onNext(T t); // 데이터 수신
void onError(Throwable t); // 에러 발생
void onComplete(); // 완료 신호
}
public interface Subscription {
void request(long n); // n개의 데이터를 요청 (배압의 핵심)
void cancel(); // 구독 취소
}
// Publisher이자 Subscriber — 중간 처리 단계
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
동작 흐름
Publisher Subscriber
│◄── subscribe() ────────────│
│── onSubscribe(Subscription)──►│
│◄── request(3) ─────────────│ ← 3개 요청
│── onNext(data1) ──────────►│
│── onNext(data2) ──────────►│
│── onNext(data3) ──────────►│
│◄── request(2) ─────────────│ ← 추가 2개 요청
│── onNext(data4) ──────────►│
│── onNext(data5) ──────────►│
│── onComplete() ───────────►│
핵심은 Subscriber가 먼저 request(n)을 호출해야 Publisher가 데이터를 보낸다 는 점이에요. 이것이 배압(Backpressure)의 기본 원리입니다.
3. 배압(Backpressure)
Publisher가 초당 10,000건을 발행하는데 Subscriber가 초당 100건만 처리할 수 있으면 어떻게 될까요? 배압이 없으면 메모리에 데이터가 쌓여서 OutOfMemoryError가 발생합니다.
request(n)을 통해 Subscriber가 처리 가능한 만큼만 요청하면, 소비 속도에 맞춰 발행 속도가 자연스럽게 조절돼요.
| 전략 | 설명 | 사용 시나리오 |
|---|---|---|
| Buffer | 초과 데이터를 버퍼에 저장 | 일시적인 속도 차이 |
| Drop | 초과 데이터를 버림 | 최신 데이터만 중요한 경우 |
| Latest | 가장 최신 데이터만 유지 | 실시간 가격, 센서 데이터 |
| Error | 초과 시 에러 발생 | 데이터 유실 불가 |
Flux.range(1, 1000)
.onBackpressureBuffer(100) // 최대 100개까지 버퍼링
.subscribe(data -> slowProcess(data));
4. Reactor 소개 — Mono와 Flux
Reactor 는 Reactive Streams 스펙의 구현체이자 Spring WebFlux의 기반 라이브러리입니다. 핵심 타입은 딱 두 개예요.
Mono<T>: 0 또는 1개의 데이터를 발행Flux<T>: 0 ~ N개의 데이터를 발행
중요한 원칙이 하나 있는데요, ** 구독하기 전까지 아무 일도 일어나지 않습니다.** subscribe()를 호출해야 파이프라인이 실행돼요.
Mono<String> mono = Mono.just("안녕하세요");
// 여기서는 아무 일도 안 일어남
mono.subscribe(System.out::println);
// 이제 "안녕하세요" 출력
5. Mono — 0 또는 1개의 데이터
// 생성
Mono<String> mono = Mono.just("데이터");
Mono<String> empty = Mono.empty();
Mono<String> error = Mono.error(new RuntimeException("실패"));
// 지연 생성 — 구독 시점에 실행
Mono<String> deferred = Mono.defer(() ->
Mono.just("현재 시간: " + System.currentTimeMillis())
);
// 블로킹 호출을 감싸기
Mono<String> fromCallable = Mono.fromCallable(() -> fetchFromDatabase());
변환과 에러 처리
Mono.just("hello")
.map(s -> s.toUpperCase()) // 동기 변환: "HELLO"
.flatMap(s -> saveToDb(s)) // 비동기 변환: Mono<Result> 반환
.onErrorResume(e -> { // 에러 시 대체 Mono
log.warn("실패: {}", e.getMessage());
return fetchFromCache();
})
.subscribe(result -> log.info("완료: {}", result));
실무 패턴 — 있으면 반환, 없으면 생성
public Mono<User> findOrCreate(String email) {
return userRepository.findByEmail(email)
.switchIfEmpty(Mono.defer(() -> { // 비어있으면 실행
User newUser = new User(email);
return userRepository.save(newUser);
}));
}
Mono.defer()로 감싸는 이유는 구독 시점에 실행되도록 지연하기 위해서예요. 감싸지 않으면 switchIfEmpty와 관계없이 즉시 실행됩니다.
6. Flux — 0~N개의 데이터
Flux<String> flux = Flux.just("A", "B", "C");
Flux<Integer> range = Flux.range(1, 10);
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
// 필터와 변환
Flux.range(1, 100)
.filter(n -> n % 2 == 0) // 짝수만
.map(n -> n * 10) // 10배
.take(5) // 처음 5개만
.subscribe(System.out::println); // 20, 40, 60, 80, 100
// 여러 Flux 결합
Flux.zip(flux1, flux2, (s, n) -> s + n); // 쌍으로 묶기: A1, B2, C3
Flux.concat(flux1, flux2); // 순서대로 이어 붙이기
Flux.merge(flux1, flux2); // 도착 순서대로 (순서 미보장)
7. 핵심 연산자
map vs flatMap
이 둘의 차이를 확실히 알아야 합니다.
// map: 동기 1:1 변환 (T → R)
Flux.just(1, 2, 3)
.map(n -> n * 2)
.subscribe(System.out::println); // 2, 4, 6
// flatMap: 비동기 1:N 변환 (T → Publisher<R>)
Flux.just(1, 2, 3)
.flatMap(n -> fetchData(n)) // 각 값마다 비동기 호출
.subscribe(System.out::println); // 결과 순서 보장 안 됨
| 구분 | map | flatMap |
|---|---|---|
| 변환 타입 | T → R | T → Publisher<R> |
| 동기/비동기 | 동기 | 비동기 |
| 순서 보장 | 보장 | 보장 안 됨 |
| 사용 시점 | 단순 값 변환 | DB 조회, API 호출 등 |
flatMap vs concatMap
flatMap은 순서를 보장하지 않아요. 순서가 중요하면 concatMap을 써야 합니다.
// flatMap: 병렬 실행, 순서 미보장 (빠름)
Flux.just(3, 1, 2)
.flatMap(n -> Mono.just(n).delayElement(Duration.ofMillis(n * 100)))
.subscribe(System.out::println); // 1, 2, 3
// concatMap: 순차 실행, 순서 보장 (느림)
Flux.just(3, 1, 2)
.concatMap(n -> Mono.just(n).delayElement(Duration.ofMillis(n * 100)))
.subscribe(System.out::println); // 3, 1, 2
switchIfEmpty vs defaultIfEmpty
Mono.empty().defaultIfEmpty("기본값"); // 고정 값 반환
Mono.empty().switchIfEmpty(fetchDefaultFromDb()); // 대체 Publisher 실행
zip — 여러 비동기 작업 동시 실행
Mono<User> user = userService.findById(userId);
Mono<List<Order>> orders = orderService.findByUserId(userId);
Mono<Point> point = pointService.getBalance(userId);
Mono.zip(user, orders, point)
.map(tuple -> new UserProfile(tuple.getT1(), tuple.getT2(), tuple.getT3()))
.subscribe(profile -> log.info("프로필 조회 완료"));
세 API가 동시에 실행되므로, 가장 느린 API의 응답 시간이 전체 응답 시간이 됩니다.
8. 스케줄러
| 스케줄러 | 용도 | 스레드 특성 |
|---|---|---|
Schedulers.immediate() | 현재 스레드 | 스레드 전환 없음 |
Schedulers.single() | 단일 스레드 | 순차 처리 |
Schedulers.parallel() | CPU 집약적 작업 | CPU 코어 수만큼 |
Schedulers.boundedElastic() | 블로킹 I/O | 최대 10 × CPU 코어 |
publishOn vs subscribeOn
Flux.range(1, 3)
.map(i -> {
log.info("[map1] {} on {}", i, Thread.currentThread().getName());
return i * 10;
})
.publishOn(Schedulers.parallel()) // 여기부터 parallel 스레드
.map(i -> {
log.info("[map2] {} on {}", i, Thread.currentThread().getName());
return i + 1;
})
.subscribeOn(Schedulers.boundedElastic()) // 소스 구독은 elastic 스레드
.subscribe();
// [map1] 1 on boundedElastic-1 ← subscribeOn의 영향
// [map2] 10 on parallel-1 ← publishOn의 영향
기억할 포인트:
subscribeOn은 체인 어디에 있든 소스의 실행 스레드를 변경합니다publishOn은 선언 위치 이후의 연산자에만 영향을 줍니다- 블로킹 작업은 반드시
Schedulers.boundedElastic()에서 실행해야 해요
// 블로킹 DB 호출을 리액티브로 감싸기
Mono<User> user = Mono.fromCallable(() ->
jdbcTemplate.queryForObject("SELECT * FROM users WHERE id = ?", userRowMapper, userId)
)
.subscribeOn(Schedulers.boundedElastic());
절대로 Schedulers.parallel()에서 블로킹 호출을 하면 안 됩니다. parallel 스레드가 고갈되면 전체 파이프라인이 멈춰요.
9. 에러 처리
리액티브에서는 try-catch를 쓸 수 없어요. 파이프라인 안에서 에러를 처리해야 합니다.
// onErrorReturn — 기본값 반환
Mono.just("abc")
.map(s -> Integer.parseInt(s))
.onErrorReturn(0)
.subscribe(System.out::println); // 0
// onErrorResume — 대체 로직 실행 (특정 예외 타입 지정 가능)
Mono.just("abc")
.map(s -> Integer.parseInt(s))
.onErrorResume(NumberFormatException.class, e -> Mono.just(-1))
.subscribe(System.out::println); // -1
// retry — 재시도
webClient.get().uri("/api/data").retrieve()
.bodyToMono(String.class)
.retry(3) // 최대 3회 재시도
.subscribe(System.out::println);
// retryWhen — 지수 백오프로 세밀한 재시도 제어
webClient.get().uri("/api/data").retrieve()
.bodyToMono(String.class)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(10)))
.subscribe(System.out::println);
doOnError는 에러를 소비하지 않아요. 로깅용으로 쓰고, 실제 복구는 onErrorReturn이나 onErrorResume에서 해야 합니다.
10. WebFlux와의 관계
Spring MVC (동기 블로킹)
요청 → 스레드 → DB 쿼리 ── 대기 ── 응답 (스레드가 대기하는 동안 아무것도 못 함)
Spring WebFlux (비동기 논블로킹)
요청 → 이벤트 루프 → DB 쿼리 ── 콜백 등록 (스레드가 다른 요청 처리)
WebFlux는 Reactor를 기반으로 동작합니다. 컨트롤러에서 Mono와 Flux를 반환하면 돼요.
@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable Long id) {
return userService.findById(id);
}
@GetMapping("/users")
public Flux<User> getAllUsers() {
return userService.findAll();
}
언제 리액티브를 쓰고, 언제 쓰지 않는가
적합한 경우: 높은 동시성(채팅, 알림, 스트리밍), I/O 대기가 긴 서비스, MSA 게이트웨이
** 불필요하거나 오히려 안 좋은 경우:** CRUD 위주 서비스, 팀이 리액티브에 익숙하지 않은 경우, JPA·MyBatis 같은 블로킹 라이브러리 중심, CPU 집약적 작업
리액티브를 무조건 적용하는 것은 오히려 해가 될 수 있어요. ** 트레이드오프 **를 이해하는 것이 핵심입니다.
11. Virtual Threads vs Reactive
Java 21에서 가상 스레드가 정식 출시되면서 선택지가 하나 더 생겼습니다.
// 가상 스레드: 기존 동기 코드를 그대로 쓰면서 확장성 확보
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 10_000; i++) {
executor.submit(() -> {
// 블로킹 코드도 OK — 가상 스레드가 알아서 양보
return httpClient.send(request, BodyHandlers.ofString()).body();
});
}
}
| 구분 | Reactive (Reactor) | Virtual Threads |
|---|---|---|
| 프로그래밍 모델 | 선언적 파이프라인 | 기존 명령형 코드 |
| 학습 곡선 | 높음 | 낮음 |
| 디버깅 | 어려움 (스택 트레이스 복잡) | 쉬움 (일반 스레드와 동일) |
| 배압 지원 | 내장 | 직접 구현 필요 |
| 생태계 | WebFlux, R2DBC, WebClient | 기존 JDBC, JPA 그대로 사용 |
| 최소 Java 버전 | Java 8+ | Java 21+ |
** 방향성:**
- 새 프로젝트 + Java 21+이라면 가상 스레드부터 고려
- 기존 WebFlux 프로젝트는 굳이 마이그레이션할 필요 없음
- 실시간 스트리밍/이벤트 기반이면 Reactor가 더 적합
- 핵심은 "둘 다 이해하고, 상황에 따라 선택할 수 있는 것"입니다
12. 정리 테이블
| 개념 | 설명 |
|---|---|
| Reactive Streams | 비동기 스트림 처리의 표준 인터페이스 (Publisher, Subscriber, Subscription) |
| 배압 (Backpressure) | Subscriber가 처리 가능한 속도로 데이터를 요청하는 메커니즘 |
| Mono | 0~1개의 데이터를 발행하는 Publisher |
| Flux | 0~N개의 데이터를 발행하는 Publisher |
| Cold Sequence | 구독 시점에 데이터 발행 시작 (subscribe 전까지 아무 일도 안 일어남) |
| Hot Sequence | 구독 여부와 관계없이 데이터 발행 (실시간 이벤트) |
| 연산자 | 용도 |
|---|---|
map | 동기 1:1 변환 |
flatMap | 비동기 1:N 변환 (순서 미보장) |
concatMap | 비동기 1:N 변환 (순서 보장) |
zip | 여러 Publisher를 조합 |
switchIfEmpty | 빈 결과 시 대체 Publisher |
publishOn | 다운스트림 실행 스레드 변경 |
subscribeOn | 소스 구독 스레드 변경 |
onErrorReturn | 에러 시 기본값 |
onErrorResume | 에러 시 대체 Publisher |
retry / retryWhen | 에러 시 재시도 |
주의할 점
parallel 스케줄러에서 블로킹 호출을 하면 안 됩니다
Schedulers.parallel()은 CPU 코어 수만큼의 스레드를 사용합니다. 여기서 JDBC 호출 같은 블로킹 작업을 하면 ** 모든 parallel 스레드가 고갈 **되어 전체 파이프라인이 멈춰요. 블로킹 작업은 반드시 Schedulers.boundedElastic()에서 실행해야 합니다.
subscribe() 없이는 아무 일도 일어나지 않아요
Reactor의 Mono/Flux는 cold 시퀀스 예요. subscribe()를 호출하기 전에는 파이프라인이 실행되지 않습니다. 이걸 모르면 "코드를 다 짰는데 왜 동작을 안 하지?"라는 상황에 빠지게 돼요.
switchIfEmpty에서 Mono.defer()를 빠뜨리면 의도치 않은 실행이 발생해요
switchIfEmpty(Mono.just(createExpensiveObject())) 형태로 쓰면, 값이 있든 없든 createExpensiveObject()가 즉시 실행 됩니다. Mono.defer()로 감싸야 비어있을 때만 실행돼요.
디버깅은 log() 연산자로
리액티브 파이프라인의 스택 트레이스는 일반 코드보다 훨씬 복잡해요. log() 연산자를 파이프라인 중간에 끼워 넣으면 구독, request, onNext 등의 흐름을 확인할 수 있습니다.
Flux.range(1, 5)
.log("range")
.map(n -> n * 2)
.log("mapped")
.subscribe();
정리
| 개념 | 핵심 정리 |
|---|---|
| Reactive Streams | Publisher-Subscriber 패턴 + 배압. Java 9에서 java.util.concurrent.Flow로 편입 |
| ** 배압** | Subscriber가 request(n)으로 처리 가능한 만큼만 요청하는 메커니즘 |
| Mono / Flux | 0 |
| map vs flatMap | 동기 1:1 변환 vs 비동기 1:N 변환(순서 미보장). 비동기 호출에는 flatMap |
| publishOn vs subscribeOn | 다운스트림 실행 스레드 변경 vs 소스 구독 스레드 변경 |
| ** 에러 처리** | try-catch 불가. onErrorReturn, onErrorResume, retry 사용 |
| Virtual Threads | Java 21+에서 블로킹 코드로 비슷한 처리량 달성. 배압은 직접 구현 필요 |