Stream Gatherers — 커스텀 중간 연산을 정의하는 방법
Stream Gatherers — 커스텀 중간 연산을 정의하는 방법
Stream API에 "내가 원하는 중간 연산"을 추가할 수 없었던 이유
Java 8의 Stream API는 혁신적이었지만, 한 가지 아쉬운 점이 있었습니다. 종단 연산(terminal operation) 은 Collector를 통해 자유롭게 커스텀할 수 있는 반면, 중간 연산(intermediate operation) 은 filter, map, flatMap, sorted, distinct 등 미리 정해진 것만 사용할 수 있었습니다.
// 종단 연산: Collector로 자유롭게 커스텀 가능
stream.collect(Collectors.groupingBy(...));
stream.collect(myCustomCollector());
// 중간 연산: 정해진 것만 사용 가능
stream.filter(...)
.map(...)
.sorted(...) // 여기에 커스텀 중간 연산을 끼워넣을 방법이 없었음
.collect(...);
예를 들어 "3개씩 묶어서 처리", "이전 값과 비교해서 변화가 있을 때만 통과", "누적 합계를 중간 결과로" 같은 연산은 기존 API로 깔끔하게 표현할 수 없었습니다.
Java 22에서 프리뷰로 도입된 Stream Gatherers(JEP 473)가 이 문제를 해결합니다.
Gatherer란?
Gatherer는 ** 스트림의 커스텀 중간 연산 **을 정의하는 인터페이스입니다. Collector가 종단 연산의 커스텀 버전이라면, Gatherer는 중간 연산의 커스텀 버전입니다.
Collector : collect() = Gatherer : gather()
(종단 연산 커스텀) (중간 연산 커스텀)
Stream.gather() 메서드를 통해 파이프라인에 끼워넣습니다:
stream.gather(myCustomGatherer) // 커스텀 중간 연산
.filter(...)
.collect(...);
Gatherer의 4가지 구성 요소
Gatherer는 Collector와 유사한 구조를 가집니다:
public interface Gatherer<T, A, R> {
// T: 입력 타입, A: 중간 상태 타입, R: 출력 타입
Supplier<A> initializer(); // 1. 초기 상태 생성
Integrator<A, T, R> integrator(); // 2. 각 요소 처리
BinaryOperator<A> combiner(); // 3. 병렬 스트림 결합
BiConsumer<A, Downstream<R>> finisher(); // 4. 마무리 처리
}
| 구성 요소 | 역할 | Collector 대응 |
|---|---|---|
initializer | 상태 객체 초기화 | supplier |
integrator | 요소 하나를 받아 처리, 다운스트림으로 전달 | accumulator |
combiner | 병렬 스트림에서 상태 병합 | combiner |
finisher | 마지막 요소 처리 후 남은 상태 정리 | finisher |
Integrator — 핵심 처리 로직
Integrator는 세 가지를 받습니다:
@FunctionalInterface
interface Integrator<A, T, R> {
boolean integrate(A state, T element, Downstream<R> downstream);
// state: 현재 상태
// element: 현재 스트림 요소
// downstream: 결과를 다음 단계로 보내는 인터페이스
// 반환값: true면 계속, false면 스트림 조기 종료
}
downstream.push(value)로 결과를 다음 연산으로 전달합니다. false를 반환하면 스트림을 조기에 종료할 수 있습니다(short-circuiting).
내장 Gatherers — 바로 쓸 수 있는 것들
java.util.stream.Gatherers 클래스에 유용한 팩토리 메서드가 제공됩니다.
windowFixed — 고정 크기 윈도우
// 3개씩 묶기
List<List<Integer>> windows = Stream.of(1, 2, 3, 4, 5, 6, 7)
.gather(Gatherers.windowFixed(3))
.toList();
// [[1, 2, 3], [4, 5, 6], [7]]
배치 처리에 유용합니다. API 호출을 100개씩 묶어서 보낸다거나, 데이터를 청크 단위로 처리할 때 쓰입니다.
windowSliding — 슬라이딩 윈도우
// 크기 3의 슬라이딩 윈도우
List<List<Integer>> windows = Stream.of(1, 2, 3, 4, 5)
.gather(Gatherers.windowSliding(3))
.toList();
// [[1, 2, 3], [2, 3, 4], [3, 4, 5]]
이동 평균, 연속 패턴 감지 등에 유용합니다.
fold — 누적 중간 결과
// 누적 합계를 중간 결과로 내보냄
List<Integer> runningSums = Stream.of(1, 2, 3, 4, 5)
.gather(Gatherers.fold(() -> 0, Integer::sum))
.toList();
// [15] — fold는 최종 결과만 하나 내보냄
fold는 모든 요소를 처리한 후 ** 최종 결과 하나 **만 내보냅니다. 중간 누적값이 필요하면scan을 사용하세요.
scan — 누적 중간 결과를 매번 출력
// 누적 합계를 매 단계마다 출력
List<Integer> runningSums = Stream.of(1, 2, 3, 4, 5)
.gather(Gatherers.scan(() -> 0, Integer::sum))
.toList();
// [1, 3, 6, 10, 15]
scan은 fold와 달리 각 단계의 누적 결과를 모두 다운스트림으로 전달합니다.
mapConcurrent — 동시 매핑 (Virtual Threads)
// 최대 5개를 동시에 처리하되 순서 유지
List<String> results = Stream.of("url1", "url2", "url3", "url4", "url5")
.gather(Gatherers.mapConcurrent(5, url -> fetchData(url)))
.toList();
mapConcurrent는 Virtual Threads를 사용해 최대 N개 요소를 동시에 매핑 합니다. 놀라운 점은 입력 순서를 유지 한다는 것입니다. I/O 바운드 작업(HTTP 호출, DB 쿼리)에서 극적인 성능 향상을 줍니다.
커스텀 Gatherer 만들기
예제 1: distinctBy — 특정 필드 기준 중복 제거
기존 distinct()는 equals()로만 비교합니다. 특정 필드 기준으로 중복을 제거하려면?
public static <T, K> Gatherer<T, ?, T> distinctBy(Function<T, K> keyExtractor) {
return Gatherer.ofSequential(
// initializer: 이미 본 키를 저장할 Set
HashSet<K>::new,
// integrator: 새 키면 통과, 이미 본 키면 무시
(seen, element, downstream) -> {
K key = keyExtractor.apply(element);
if (seen.add(key)) {
downstream.push(element);
}
return true; // 계속 처리
}
);
}
// 사용
record Employee(String dept, String name) {}
var employees = List.of(
new Employee("개발", "Alice"),
new Employee("개발", "Bob"),
new Employee("인사", "Charlie"),
new Employee("인사", "Dave")
);
// 부서별 첫 번째 직원만
var firstPerDept = employees.stream()
.gather(distinctBy(Employee::dept))
.toList();
// [Employee[dept=개발, name=Alice], Employee[dept=인사, name=Charlie]]
예제 2: takeWhileChanged — 값이 변할 때까지만 가져오기
public static <T> Gatherer<T, ?, T> takeWhileChanged(
Function<T, ?> keyExtractor) {
// 상태: 이전 키를 추적
class State {
Object previousKey = new Object(); // 센티넬 값 (아직 시작 안 함)
boolean started = false;
}
return Gatherer.ofSequential(
State::new,
(state, element, downstream) -> {
Object currentKey = keyExtractor.apply(element);
if (!state.started) {
state.started = true;
state.previousKey = currentKey;
return downstream.push(element);
}
if (Objects.equals(state.previousKey, currentKey)) {
return downstream.push(element);
}
return false; // 키가 변했으면 스트림 종료
}
);
}
// 같은 카테고리의 연속된 항목만 가져오기
var items = List.of("A:1", "A:2", "A:3", "B:4", "A:5");
var result = items.stream()
.gather(takeWhileChanged(s -> s.split(":")[0]))
.toList();
// [A:1, A:2, A:3] — B에서 키가 변하므로 종료
예제 3: zipWithIndex — 인덱스 부여
public record Indexed<T>(int index, T value) {}
public static <T> Gatherer<T, ?, Indexed<T>> zipWithIndex() {
return Gatherer.ofSequential(
// initializer: 카운터
() -> new int[]{0},
// integrator: 인덱스를 붙여서 전달
(counter, element, downstream) -> {
downstream.push(new Indexed<>(counter[0]++, element));
return true;
}
);
}
// 사용
List.of("Apple", "Banana", "Cherry").stream()
.gather(zipWithIndex())
.forEach(System.out::println);
// Indexed[index=0, value=Apple]
// Indexed[index=1, value=Banana]
// Indexed[index=2, value=Cherry]
Gatherer 생성 헬퍼 메서드
| 팩토리 메서드 | 상태 | 병렬 지원 | 용도 |
|---|---|---|---|
Gatherer.of(init, integrator, combiner, finisher) | ✅ | ✅ | 완전한 병렬 Gatherer |
Gatherer.ofSequential(init, integrator) | ✅ | ❌ | 순차 처리, 상태 있음 |
Gatherer.ofSequential(init, integrator, finisher) | ✅ | ❌ | 순차 + 마무리 처리 |
Gatherer.of(integrator) | ❌ | ❌ | 상태 없는 단순 변환 |
대부분의 경우 Gatherer.ofSequential()로 충분합니다.
실전 활용 패턴
배치 API 호출
public List<Response> batchProcess(List<Request> requests) {
return requests.stream()
.gather(Gatherers.windowFixed(100)) // 100개씩 묶기
.gather(Gatherers.mapConcurrent(3, // 최대 3 배치 동시 호출
batch -> apiClient.bulkSend(batch)))
.flatMap(List::stream)
.toList();
}
이동 평균 계산
public static Gatherer<Double, ?, Double> movingAverage(int windowSize) {
return Gatherer.<Double, LinkedList<Double>, Double>ofSequential(
LinkedList::new,
(window, value, downstream) -> {
window.addLast(value);
if (window.size() > windowSize) {
window.removeFirst();
}
if (window.size() == windowSize) {
double avg = window.stream()
.mapToDouble(Double::doubleValue)
.average()
.orElse(0);
downstream.push(avg);
}
return true;
}
);
}
// 3일 이동 평균
Stream.of(10.0, 20.0, 30.0, 40.0, 50.0)
.gather(movingAverage(3))
.forEach(System.out::println);
// 20.0 (10+20+30)/3
// 30.0 (20+30+40)/3
// 40.0 (30+40+50)/3
Collector와의 비교
| 특성 | Collector | Gatherer |
|---|---|---|
| 위치 | 종단 연산 (collect()) | 중간 연산 (gather()) |
| 출력 | 최종 결과 하나 | 0개 이상의 요소를 스트림으로 |
| 조기 종료 | 불가 | 가능 (false 반환) |
| 다운스트림 제어 | 없음 | Downstream.push() |
| 파이프라인 체이닝 | 마지막에만 | 어디든 끼워넣기 가능 |
주의할 점
Preview API
Stream Gatherers는 Java 22에서 프리뷰로 도입되었습니다. 사용하려면:
javac --enable-preview --source 25 MyApp.java
java --enable-preview MyApp
병렬 스트림에서의 주의
Gatherer.ofSequential()로 만든 Gatherer는 순차 스트림에서만 올바르게 동작합니다. 병렬 스트림을 지원하려면 Gatherer.of()에 combiner를 제공해야 합니다.
// 병렬 지원 Gatherer
Gatherer.of(
initializer,
integrator,
combiner, // 병렬 처리 시 상태 병합 로직
finisher
);
gather() 체이닝
여러 Gatherer를 체이닝할 수 있습니다:
stream.gather(windowFixed(5))
.gather(mapConcurrent(3, this::process))
.gather(distinctBy(Result::id))
.toList();
각 gather()는 독립적인 중간 연산이므로 순서대로 적용됩니다.
정리
Stream Gatherers는
Collector가 종단 연산에 해준 것을 중간 연산에 해주는 기능입니다. 윈도우, 스캔, 동시 매핑 같은 복잡한 스트림 처리를 깔끔한 파이프라인으로 표현할 수 있게 됩니다.
| 내장 Gatherer | 용도 |
|---|---|
windowFixed(n) | n개씩 고정 크기로 묶기 |
windowSliding(n) | n개씩 슬라이딩 윈도우 |
fold(init, op) | 전체를 하나로 접기 |
scan(init, op) | 누적 결과를 매번 출력 |
mapConcurrent(n, fn) | 최대 n개 동시 매핑 (Virtual Threads) |
공부하면서 느낀 건, Gatherer가 단순히 "새 API"가 아니라 Stream의 확장성 자체를 한 단계 끌어올렸다는 점입니다. 지금까지 for 루프로 빠져나와야 했던 복잡한 스트림 처리(윈도우, 상태 기반 필터링, 동시 매핑)를 이제 파이프라인 안에서 해결할 수 있습니다.