Stream Gatherers — 커스텀 중간 연산을 정의하는 방법

Stream API에 "내가 원하는 중간 연산"을 추가할 수 없었던 이유

Java 8의 Stream API는 혁신적이었지만, 한 가지 아쉬운 점이 있었습니다. 종단 연산(terminal operation) 은 Collector를 통해 자유롭게 커스텀할 수 있는 반면, 중간 연산(intermediate operation) 은 filter, map, flatMap, sorted, distinct 등 미리 정해진 것만 사용할 수 있었습니다.

JAVA
// 종단 연산: Collector로 자유롭게 커스텀 가능
stream.collect(Collectors.groupingBy(...));
stream.collect(myCustomCollector());

// 중간 연산: 정해진 것만 사용 가능
stream.filter(...)
      .map(...)
      .sorted(...)  // 여기에 커스텀 중간 연산을 끼워넣을 방법이 없었음
      .collect(...);

예를 들어 "3개씩 묶어서 처리", "이전 값과 비교해서 변화가 있을 때만 통과", "누적 합계를 중간 결과로" 같은 연산은 기존 API로 깔끔하게 표현할 수 없었습니다.

Java 22에서 프리뷰로 도입된 Stream Gatherers(JEP 473)가 이 문제를 해결합니다.


Gatherer란?

Gatherer는 ** 스트림의 커스텀 중간 연산 **을 정의하는 인터페이스입니다. Collector가 종단 연산의 커스텀 버전이라면, Gatherer는 중간 연산의 커스텀 버전입니다.

PLAINTEXT
Collector : collect() = Gatherer : gather()
(종단 연산 커스텀)       (중간 연산 커스텀)

Stream.gather() 메서드를 통해 파이프라인에 끼워넣습니다:

JAVA
stream.gather(myCustomGatherer)   // 커스텀 중간 연산
      .filter(...)
      .collect(...);

Gatherer의 4가지 구성 요소

Gatherer는 Collector와 유사한 구조를 가집니다:

JAVA
public interface Gatherer<T, A, R> {
    // T: 입력 타입, A: 중간 상태 타입, R: 출력 타입

    Supplier<A> initializer();          // 1. 초기 상태 생성
    Integrator<A, T, R> integrator();   // 2. 각 요소 처리
    BinaryOperator<A> combiner();       // 3. 병렬 스트림 결합
    BiConsumer<A, Downstream<R>> finisher(); // 4. 마무리 처리
}
구성 요소역할Collector 대응
initializer상태 객체 초기화supplier
integrator요소 하나를 받아 처리, 다운스트림으로 전달accumulator
combiner병렬 스트림에서 상태 병합combiner
finisher마지막 요소 처리 후 남은 상태 정리finisher

Integrator — 핵심 처리 로직

Integrator는 세 가지를 받습니다:

JAVA
@FunctionalInterface
interface Integrator<A, T, R> {
    boolean integrate(A state, T element, Downstream<R> downstream);
    // state: 현재 상태
    // element: 현재 스트림 요소
    // downstream: 결과를 다음 단계로 보내는 인터페이스
    // 반환값: true면 계속, false면 스트림 조기 종료
}

downstream.push(value)로 결과를 다음 연산으로 전달합니다. false를 반환하면 스트림을 조기에 종료할 수 있습니다(short-circuiting).


내장 Gatherers — 바로 쓸 수 있는 것들

java.util.stream.Gatherers 클래스에 유용한 팩토리 메서드가 제공됩니다.

windowFixed — 고정 크기 윈도우

JAVA
// 3개씩 묶기
List<List<Integer>> windows = Stream.of(1, 2, 3, 4, 5, 6, 7)
    .gather(Gatherers.windowFixed(3))
    .toList();
// [[1, 2, 3], [4, 5, 6], [7]]

배치 처리에 유용합니다. API 호출을 100개씩 묶어서 보낸다거나, 데이터를 청크 단위로 처리할 때 쓰입니다.

windowSliding — 슬라이딩 윈도우

JAVA
// 크기 3의 슬라이딩 윈도우
List<List<Integer>> windows = Stream.of(1, 2, 3, 4, 5)
    .gather(Gatherers.windowSliding(3))
    .toList();
// [[1, 2, 3], [2, 3, 4], [3, 4, 5]]

이동 평균, 연속 패턴 감지 등에 유용합니다.

fold — 누적 중간 결과

JAVA
// 누적 합계를 중간 결과로 내보냄
List<Integer> runningSums = Stream.of(1, 2, 3, 4, 5)
    .gather(Gatherers.fold(() -> 0, Integer::sum))
    .toList();
// [15] — fold는 최종 결과만 하나 내보냄

fold는 모든 요소를 처리한 후 ** 최종 결과 하나 **만 내보냅니다. 중간 누적값이 필요하면 scan을 사용하세요.

scan — 누적 중간 결과를 매번 출력

JAVA
// 누적 합계를 매 단계마다 출력
List<Integer> runningSums = Stream.of(1, 2, 3, 4, 5)
    .gather(Gatherers.scan(() -> 0, Integer::sum))
    .toList();
// [1, 3, 6, 10, 15]

scanfold와 달리 각 단계의 누적 결과를 모두 다운스트림으로 전달합니다.

mapConcurrent — 동시 매핑 (Virtual Threads)

JAVA
// 최대 5개를 동시에 처리하되 순서 유지
List<String> results = Stream.of("url1", "url2", "url3", "url4", "url5")
    .gather(Gatherers.mapConcurrent(5, url -> fetchData(url)))
    .toList();

mapConcurrentVirtual Threads를 사용해 최대 N개 요소를 동시에 매핑 합니다. 놀라운 점은 입력 순서를 유지 한다는 것입니다. I/O 바운드 작업(HTTP 호출, DB 쿼리)에서 극적인 성능 향상을 줍니다.


커스텀 Gatherer 만들기

예제 1: distinctBy — 특정 필드 기준 중복 제거

기존 distinct()equals()로만 비교합니다. 특정 필드 기준으로 중복을 제거하려면?

JAVA
public static <T, K> Gatherer<T, ?, T> distinctBy(Function<T, K> keyExtractor) {
    return Gatherer.ofSequential(
        // initializer: 이미 본 키를 저장할 Set
        HashSet<K>::new,

        // integrator: 새 키면 통과, 이미 본 키면 무시
        (seen, element, downstream) -> {
            K key = keyExtractor.apply(element);
            if (seen.add(key)) {
                downstream.push(element);
            }
            return true;  // 계속 처리
        }
    );
}

// 사용
record Employee(String dept, String name) {}

var employees = List.of(
    new Employee("개발", "Alice"),
    new Employee("개발", "Bob"),
    new Employee("인사", "Charlie"),
    new Employee("인사", "Dave")
);

// 부서별 첫 번째 직원만
var firstPerDept = employees.stream()
    .gather(distinctBy(Employee::dept))
    .toList();
// [Employee[dept=개발, name=Alice], Employee[dept=인사, name=Charlie]]

예제 2: takeWhileChanged — 값이 변할 때까지만 가져오기

JAVA
public static <T> Gatherer<T, ?, T> takeWhileChanged(
        Function<T, ?> keyExtractor) {

    // 상태: 이전 키를 추적
    class State {
        Object previousKey = new Object();  // 센티넬 값 (아직 시작 안 함)
        boolean started = false;
    }

    return Gatherer.ofSequential(
        State::new,
        (state, element, downstream) -> {
            Object currentKey = keyExtractor.apply(element);
            if (!state.started) {
                state.started = true;
                state.previousKey = currentKey;
                return downstream.push(element);
            }
            if (Objects.equals(state.previousKey, currentKey)) {
                return downstream.push(element);
            }
            return false;  // 키가 변했으면 스트림 종료
        }
    );
}

// 같은 카테고리의 연속된 항목만 가져오기
var items = List.of("A:1", "A:2", "A:3", "B:4", "A:5");
var result = items.stream()
    .gather(takeWhileChanged(s -> s.split(":")[0]))
    .toList();
// [A:1, A:2, A:3]  — B에서 키가 변하므로 종료

예제 3: zipWithIndex — 인덱스 부여

JAVA
public record Indexed<T>(int index, T value) {}

public static <T> Gatherer<T, ?, Indexed<T>> zipWithIndex() {
    return Gatherer.ofSequential(
        // initializer: 카운터
        () -> new int[]{0},

        // integrator: 인덱스를 붙여서 전달
        (counter, element, downstream) -> {
            downstream.push(new Indexed<>(counter[0]++, element));
            return true;
        }
    );
}

// 사용
List.of("Apple", "Banana", "Cherry").stream()
    .gather(zipWithIndex())
    .forEach(System.out::println);
// Indexed[index=0, value=Apple]
// Indexed[index=1, value=Banana]
// Indexed[index=2, value=Cherry]

Gatherer 생성 헬퍼 메서드

팩토리 메서드상태병렬 지원용도
Gatherer.of(init, integrator, combiner, finisher)완전한 병렬 Gatherer
Gatherer.ofSequential(init, integrator)순차 처리, 상태 있음
Gatherer.ofSequential(init, integrator, finisher)순차 + 마무리 처리
Gatherer.of(integrator)상태 없는 단순 변환

대부분의 경우 Gatherer.ofSequential()로 충분합니다.


실전 활용 패턴

배치 API 호출

JAVA
public List<Response> batchProcess(List<Request> requests) {
    return requests.stream()
        .gather(Gatherers.windowFixed(100))        // 100개씩 묶기
        .gather(Gatherers.mapConcurrent(3,         // 최대 3 배치 동시 호출
            batch -> apiClient.bulkSend(batch)))
        .flatMap(List::stream)
        .toList();
}

이동 평균 계산

JAVA
public static Gatherer<Double, ?, Double> movingAverage(int windowSize) {
    return Gatherer.<Double, LinkedList<Double>, Double>ofSequential(
        LinkedList::new,
        (window, value, downstream) -> {
            window.addLast(value);
            if (window.size() > windowSize) {
                window.removeFirst();
            }
            if (window.size() == windowSize) {
                double avg = window.stream()
                    .mapToDouble(Double::doubleValue)
                    .average()
                    .orElse(0);
                downstream.push(avg);
            }
            return true;
        }
    );
}

// 3일 이동 평균
Stream.of(10.0, 20.0, 30.0, 40.0, 50.0)
    .gather(movingAverage(3))
    .forEach(System.out::println);
// 20.0 (10+20+30)/3
// 30.0 (20+30+40)/3
// 40.0 (30+40+50)/3

Collector와의 비교

특성CollectorGatherer
위치종단 연산 (collect())중간 연산 (gather())
출력최종 결과 하나0개 이상의 요소를 스트림으로
조기 종료불가가능 (false 반환)
다운스트림 제어없음Downstream.push()
파이프라인 체이닝마지막에만어디든 끼워넣기 가능

주의할 점

Preview API

Stream Gatherers는 Java 22에서 프리뷰로 도입되었습니다. 사용하려면:

BASH
javac --enable-preview --source 25 MyApp.java
java --enable-preview MyApp

병렬 스트림에서의 주의

Gatherer.ofSequential()로 만든 Gatherer는 순차 스트림에서만 올바르게 동작합니다. 병렬 스트림을 지원하려면 Gatherer.of()combiner를 제공해야 합니다.

JAVA
// 병렬 지원 Gatherer
Gatherer.of(
    initializer,
    integrator,
    combiner,     // 병렬 처리 시 상태 병합 로직
    finisher
);

gather() 체이닝

여러 Gatherer를 체이닝할 수 있습니다:

JAVA
stream.gather(windowFixed(5))
      .gather(mapConcurrent(3, this::process))
      .gather(distinctBy(Result::id))
      .toList();

gather()는 독립적인 중간 연산이므로 순서대로 적용됩니다.


정리

Stream Gatherers는 Collector가 종단 연산에 해준 것을 중간 연산에 해주는 기능입니다. 윈도우, 스캔, 동시 매핑 같은 복잡한 스트림 처리를 깔끔한 파이프라인으로 표현할 수 있게 됩니다.

내장 Gatherer용도
windowFixed(n)n개씩 고정 크기로 묶기
windowSliding(n)n개씩 슬라이딩 윈도우
fold(init, op)전체를 하나로 접기
scan(init, op)누적 결과를 매번 출력
mapConcurrent(n, fn)최대 n개 동시 매핑 (Virtual Threads)

공부하면서 느낀 건, Gatherer가 단순히 "새 API"가 아니라 Stream의 확장성 자체를 한 단계 끌어올렸다는 점입니다. 지금까지 for 루프로 빠져나와야 했던 복잡한 스트림 처리(윈도우, 상태 기반 필터링, 동시 매핑)를 이제 파이프라인 안에서 해결할 수 있습니다.

댓글 로딩 중...