서비스 간 통신이 모두 HTTP 동기 호출이라면, 하나가 느려질 때 다른 서비스도 같이 느려지지 않을까요?

동기 HTTP 통신은 호출한 서비스가 응답을 기다려야 합니다. 이벤트 기반 비동기 통신은 메시지 브로커를 중간에 두어 서비스 간 결합도를 낮추고, 장애 전파를 방지합니다. Spring Cloud Stream은 이런 이벤트 기반 아키텍처를 손쉽게 구현할 수 있는 프레임워크입니다.

Spring Cloud Stream이란

Spring Cloud Stream은 메시지 브로커를 추상화하여 이벤트 기반 마이크로서비스를 구축 하는 프레임워크입니다.

핵심 아이디어:

  • Binder: 메시지 브로커(Kafka, RabbitMQ 등)와의 연결을 추상화
  • ** 함수형 모델 **: Java의 Function, Consumer, Supplier로 메시지 처리
  • ** 설정 기반 바인딩 **: 코드 변경 없이 설정으로 토픽/큐 매핑
PLAINTEXT
Producer → [Binder] → Kafka/RabbitMQ → [Binder] → Consumer

의존성

XML
<!-- Kafka Binder -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

<!-- 또는 RabbitMQ Binder -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

함수형 프로그래밍 모델

Spring Cloud Stream 3.x부터 함수형 모델이 기본입니다. 이전의 @EnableBinding, @StreamListener는 더 이상 사용하지 않습니다.

Consumer — 메시지 소비

JAVA
@Configuration
public class OrderEventHandler {

    @Bean
    public Consumer<OrderCreatedEvent> handleOrderCreated() {
        return event -> {
            log.info("주문 생성 이벤트 수신: {}", event.getOrderId());
            // 재고 차감, 알림 발송 등
            inventoryService.decreaseStock(event.getProductId(), event.getQuantity());
        };
    }
}

Supplier — 메시지 생산

JAVA
@Configuration
public class EventProducer {

    @Bean
    public Supplier<OrderCreatedEvent> orderEventSupplier() {
        return () -> {
            // 주기적으로 호출됨 (기본 1초)
            return new OrderCreatedEvent(generateOrderId(), "PRODUCT-001", 1);
        };
    }
}

하지만 실무에서는 Supplier보다 StreamBridge를 사용하여 필요한 시점에 메시지를 보내는 것이 일반적입니다.

StreamBridge — 프로그래밍 방식 생산

JAVA
@Service
public class OrderService {

    private final StreamBridge streamBridge;

    public Order createOrder(OrderCreateRequest request) {
        Order order = orderRepository.save(request.toEntity());

        // 이벤트 발행
        OrderCreatedEvent event = new OrderCreatedEvent(
                order.getId(),
                order.getProductId(),
                order.getQuantity()
        );
        streamBridge.send("orderCreated-out-0", event);

        return order;
    }
}

Function — 메시지 변환 (Processor)

입력 메시지를 받아 가공 후 출력합니다.

JAVA
@Bean
public Function<OrderCreatedEvent, NotificationEvent> orderToNotification() {
    return orderEvent -> {
        log.info("주문 이벤트를 알림 이벤트로 변환: {}", orderEvent.getOrderId());
        return new NotificationEvent(
                orderEvent.getOrderId(),
                "주문이 생성되었습니다",
                NotificationType.EMAIL
        );
    };
}

바인딩 설정

함수 이름과 메시지 브로커의 토픽/큐를 매핑합니다.

YAML
spring:
  cloud:
    stream:
      function:
        definition: handleOrderCreated;orderToNotification
      bindings:
        # Consumer 바인딩
        handleOrderCreated-in-0:
          destination: order-events        # Kafka 토픽 / RabbitMQ 큐 이름
          group: inventory-service          # 컨슈머 그룹
          content-type: application/json

        # Function 입력 바인딩
        orderToNotification-in-0:
          destination: order-events
          group: notification-service

출력 바인딩과 브로커 설정도 같은 구조로 추가합니다.

YAML
        # Function 출력 바인딩
        orderToNotification-out-0:
          destination: notification-events

        # StreamBridge 출력 바인딩
        orderCreated-out-0:
          destination: order-events

      kafka:
        binder:
          brokers: localhost:9092

바인딩 이름 규칙

PLAINTEXT
{함수이름}-{in|out}-{인덱스}
  • handleOrderCreated-in-0: handleOrderCreated 함수의 첫 번째 입력
  • orderToNotification-out-0: orderToNotification 함수의 첫 번째 출력

컨슈머 그룹

같은 토픽을 여러 서비스 인스턴스가 구독할 때, ** 컨슈머 그룹을 설정하면 그룹 내에서 하나의 인스턴스만 메시지를 받습니다 **.

YAML
bindings:
  handleOrderCreated-in-0:
    destination: order-events
    group: inventory-service  # 같은 그룹의 인스턴스 중 하나만 처리

컨슈머 그룹이 없으면 모든 인스턴스가 같은 메시지를 중복 처리하게 됩니다.

파티셔닝

같은 키의 메시지가 항상 같은 파티션(컨슈머)으로 전달되어 ** 메시지 순서를 보장 **합니다.

프로듀서 설정

YAML
spring:
  cloud:
    stream:
      bindings:
        orderCreated-out-0:
          destination: order-events
          producer:
            partition-key-expression: headers['partitionKey']
            partition-count: 3
JAVA
streamBridge.send("orderCreated-out-0",
    MessageBuilder.withPayload(event)
            .setHeader("partitionKey", event.getOrderId().toString())
            .build());

컨슈머 설정

YAML
spring:
  cloud:
    stream:
      bindings:
        handleOrderCreated-in-0:
          destination: order-events
          group: order-processor
          consumer:
            partitioned: true
      instance-count: 3    # 전체 인스턴스 수
      instance-index: 0    # 이 인스턴스의 인덱스 (0, 1, 2)

에러 처리와 DLQ

리트라이

YAML
spring:
  cloud:
    stream:
      bindings:
        handleOrderCreated-in-0:
          consumer:
            max-attempts: 3          # 최대 3번 시도
            back-off-initial-interval: 1000
            back-off-multiplier: 2.0

DLQ (Dead Letter Queue)

재시도 후에도 처리 실패한 메시지를 별도 큐에 보관합니다.

Kafka:

YAML
spring:
  cloud:
    stream:
      kafka:
        bindings:
          handleOrderCreated-in-0:
            consumer:
              enableDlq: true
              dlqName: order-events-dlq  # DLQ 토픽 이름
              autoCommitOnError: false

RabbitMQ:

YAML
spring:
  cloud:
    stream:
      rabbit:
        bindings:
          handleOrderCreated-in-0:
            consumer:
              auto-bind-dlq: true
              dlq-ttl: 300000        # DLQ 메시지 TTL (5분)
              republishToDlq: true   # 에러 정보 포함하여 DLQ에 발행

DLQ 메시지 처리

JAVA
@Bean
public Consumer<Message<OrderCreatedEvent>> handleDlq() {
    return message -> {
        log.error("DLQ 메시지 수신: {}", message.getPayload());
        log.error("에러 원인: {}", message.getHeaders().get("x-exception-message"));

        // 슬랙 알림, DB 저장 등
        alertService.sendDlqAlert(message);
    };
}

Binder 교체 — Kafka에서 RabbitMQ로

비즈니스 코드 변경 없이 의존성과 설정만 변경하면 됩니다.

XML
<!-- Kafka Binder를 제거하고 -->
<!-- RabbitMQ Binder를 추가 -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
YAML
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

Consumer<OrderCreatedEvent> 코드는 그대로입니다. Binder 추상화 덕분입니다.

실무 팁

  • ** 멱등성 **을 반드시 보장하세요. 메시지가 중복 전달될 수 있습니다 (at-least-once)
  • 이벤트 페이로드에 ** 최소한의 데이터 **만 담고, 상세 정보는 API로 조회하세요
  • ** 컨슈머 그룹 **을 설정하지 않으면 인스턴스 수만큼 중복 처리가 발생합니다
  • DLQ 메시지를 ** 모니터링하고 알림 **을 설정하여 처리 실패를 빠르게 감지하세요
  • 이벤트 스키마 변경 시 ** 하위 호환성 **을 유지하세요 (새 필드 추가는 OK, 기존 필드 삭제는 위험)

주의할 점

1. 컨슈머 그룹을 설정하지 않으면 인스턴스 수만큼 메시지가 중복 처리된다

컨슈머 그룹 없이 같은 토픽을 여러 인스턴스가 구독하면, 모든 인스턴스가 동일한 메시지를 받아 처리합니다. 예를 들어 주문 이벤트를 3개 인스턴스가 구독하면 3번 결제가 일어날 수 있습니다. 반드시 group 속성을 설정하여 그룹 내 하나의 인스턴스만 메시지를 처리하도록 해야 합니다.

2. 이벤트 스키마를 변경할 때 하위 호환성을 깨면 컨슈머가 역직렬화에 실패한다

프로듀서가 이벤트에 새 필드를 추가하는 것은 괜찮지만, 기존 필드를 삭제하거나 타입을 변경하면 아직 업데이트되지 않은 컨슈머에서 역직렬화 에러가 발생합니다. 모든 컨슈머가 동시에 배포되는 것이 아니므로, 스키마 변경은 항상 하위 호환성을 유지해야 합니다.

3. DLQ 메시지를 모니터링하지 않으면 처리 실패가 조용히 묻힌다

DLQ에 메시지가 쌓여도 별도 모니터링이 없으면 아무도 눈치채지 못합니다. 결제 이벤트가 DLQ에 빠졌다면 실제로는 결제가 누락된 것인데, 서비스는 정상적으로 동작하는 것처럼 보입니다. DLQ 메시지 수에 대한 알림을 반드시 설정하고, 재처리 또는 수동 개입 프로세스를 마련하세요.

정리

  • Spring Cloud Stream은 Binder 추상화 로 메시지 브로커와 비즈니스 코드를 분리합니다
  • **함수형 모델 **(Consumer, Supplier, Function)로 메시지 처리를 간결하게 구현합니다
  • ** 파티셔닝 **으로 메시지 순서를 보장하고, DLQ 로 실패 메시지를 안전하게 관리합니다
  • 컨슈머 그룹 설정과 멱등성 보장은 실무에서 필수입니다
댓글 로딩 중...