Stream — Kafka 스타일의 로그 기반 자료구조
메시지를 보내면 사라지는 Pub/Sub과 달리, 메시지 이력을 남기면서도 여러 소비자가 분산 처리할 수 있는 방법은 없을까요?
개념 정의
Redis Stream은 시간순으로 메시지를 추가(append-only)하고, Consumer Group을 통해 여러 소비자가 분산 처리할 수 있는 로그 기반 자료구조 입니다. Kafka의 토픽-파티션-컨슈머 그룹과 유사한 개념을 Redis 안에서 구현한 것입니다.
왜 필요한가
- **Pub/Sub의 한계 **: 구독자가 없으면 메시지가 사라집니다. 연결이 끊기면 그 사이 메시지를 잃습니다
- **List의 한계 **: LPUSH/BRPOP으로 큐를 만들 수 있지만, 한 메시지를 여러 소비자가 받거나, 처리 실패 시 재시도하기 어렵습니다
Pub/Sub은 메시지를 저장하지 않고 List는 분산 소비를 지원하지 않기 ** 때문에 , ** 따라서 메시지 저장 + Consumer Group + ACK/재처리를 모두 지원하는 Stream이 도입되었습니다.
기본 명령어 — XADD와 XREAD
메시지 추가 (XADD)
# Stream에 메시지 추가 — '*'는 자동 ID 생성
127.0.0.1:6379> XADD orders * product "laptop" quantity "1" user_id "1001"
"1679000000000-0" # 타임스탬프-시퀀스 형식의 ID
127.0.0.1:6379> XADD orders * product "phone" quantity "2" user_id "1002"
"1679000000001-0"
# 최대 길이 제한 (대략적)
127.0.0.1:6379> XADD orders MAXLEN ~ 10000 * product "tablet" quantity "1" user_id "1003"
"1679000000002-0"
Stream ID는 밀리초타임스탬프-시퀀스번호 형식입니다. 같은 밀리초에 여러 메시지가 들어오면 시퀀스 번호가 증가합니다.
메시지 읽기 (XREAD)
# 처음부터 2개 읽기
127.0.0.1:6379> XREAD COUNT 2 STREAMS orders 0
1) 1) "orders"
2) 1) 1) "1679000000000-0"
2) 1) "product" 2) "laptop" 3) "quantity" 4) "1" 5) "user_id" 6) "1001"
2) 1) "1679000000001-0"
2) 1) "product" 2) "phone" 3) "quantity" 4) "2" 5) "user_id" 6) "1002"
# 블로킹 읽기 — 새 메시지가 올 때까지 대기
127.0.0.1:6379> XREAD BLOCK 5000 COUNT 1 STREAMS orders $
# '$'는 현재 시점 이후의 새 메시지만 읽겠다는 의미
# 5000ms 동안 대기
범위 읽기 (XRANGE / XREVRANGE)
# 전체 범위
127.0.0.1:6379> XRANGE orders - +
# 특정 시간 범위
127.0.0.1:6379> XRANGE orders 1679000000000 1679000000002
# 역순으로 최근 5개
127.0.0.1:6379> XREVRANGE orders + - COUNT 5
Consumer Group — 분산 처리의 핵심
Consumer Group을 사용하면 같은 Stream의 메시지를 여러 Consumer가 나눠서 처리할 수 있습니다. 각 메시지는 그룹 내에서 하나의 Consumer에게만 전달됩니다.
Consumer Group 생성
# 처음부터 모든 메시지 처리
127.0.0.1:6379> XGROUP CREATE orders order-processors 0
# 지금 이후의 새 메시지부터 처리
127.0.0.1:6379> XGROUP CREATE orders order-processors $ MKSTREAM
# Stream이 없으면 자동 생성하면서 그룹 만들기
127.0.0.1:6379> XGROUP CREATE orders new-group 0 MKSTREAM
Consumer Group으로 읽기 (XREADGROUP)
# Consumer 'worker-1'이 새 메시지 읽기
127.0.0.1:6379> XREADGROUP GROUP order-processors worker-1 COUNT 1 STREAMS orders >
# Consumer 'worker-2'가 새 메시지 읽기 (다른 메시지가 전달됨)
127.0.0.1:6379> XREADGROUP GROUP order-processors worker-2 COUNT 1 STREAMS orders >
# 블로킹 모드
127.0.0.1:6379> XREADGROUP GROUP order-processors worker-1 BLOCK 5000 COUNT 1 STREAMS orders >
>는 "아직 어떤 Consumer에게도 전달되지 않은 새 메시지"를 의미합니다.
ACK와 PEL — 메시지 처리 보장
처리 완료 알림 (XACK)
# 메시지 처리 완료
127.0.0.1:6379> XACK orders order-processors 1679000000000-0
(integer) 1
XACK를 호출하지 않으면 해당 메시지는 PEL(Pending Entries List)에 남아 있습니다.
미처리 메시지 확인 (XPENDING)
# 그룹의 pending 상태 요약
127.0.0.1:6379> XPENDING orders order-processors
1) (integer) 3 # pending 메시지 수
2) "1679000000000-0" # 가장 작은 ID
3) "1679000000002-0" # 가장 큰 ID
4) 1) 1) "worker-1"
2) "2" # worker-1이 2개 pending
2) 1) "worker-2"
2) "1" # worker-2가 1개 pending
# 상세 조회
127.0.0.1:6379> XPENDING orders order-processors - + 10
1) 1) "1679000000000-0"
2) "worker-1"
3) (integer) 60000 # idle 시간 (ms)
4) (integer) 1 # 전달 횟수
메시지 재할당 (XCLAIM)
Consumer가 죽었을 때 다른 Consumer가 미처리 메시지를 가져갈 수 있습니다.
# idle 시간이 30초 이상인 메시지를 worker-3이 가져감
127.0.0.1:6379> XCLAIM orders order-processors worker-3 30000 1679000000000-0
XAUTOCLAIM (Redis 6.2+)
XPENDING + XCLAIM을 한 번에 처리합니다.
# idle 30초 이상인 메시지를 자동으로 가져오기
127.0.0.1:6379> XAUTOCLAIM orders order-processors worker-3 30000 0-0 COUNT 5
Stream 관리
길이 제한 (XTRIM)
# 최대 10000개 유지 (정확)
127.0.0.1:6379> XTRIM orders MAXLEN 10000
# 대략적 제한 (성능 우선)
127.0.0.1:6379> XTRIM orders MAXLEN ~ 10000
# ID 기반 제한 — 특정 ID 이전 메시지 삭제
127.0.0.1:6379> XTRIM orders MINID 1679000000000-0
개별 메시지 삭제
127.0.0.1:6379> XDEL orders 1679000000000-0
(integer) 1
Stream 정보 조회
127.0.0.1:6379> XINFO STREAM orders
127.0.0.1:6379> XINFO GROUPS orders
127.0.0.1:6379> XINFO CONSUMERS orders order-processors
Stream vs Pub/Sub vs List
| 특성 | Stream | Pub/Sub | List |
|---|---|---|---|
| 메시지 저장 | 영속적 (명시적 삭제 전까지) | 저장 안 함 | 저장하지만 POP하면 삭제 |
| 소비자 그룹 | Consumer Group 지원 | 없음 | 없음 |
| 메시지 재처리 | XPENDING/XCLAIM으로 가능 | 불가 | 불가 |
| 메시지 이력 | XRANGE로 과거 조회 가능 | 불가 | 불가 |
| 브로드캐스트 | 여러 그룹이 같은 메시지 수신 | 모든 구독자에게 전달 | 한 Consumer만 수신 |
| 블로킹 읽기 | XREAD BLOCK | 기본 동작 | BRPOP |
언제 무엇을 쓸까
- Pub/Sub: 실시간 알림, 채팅처럼 메시지 유실이 허용되는 경우
- List: 단순 작업 큐, 한 Worker만 처리하면 되는 경우
- Stream: 메시지 이력 보존, 분산 처리, 재처리가 필요한 경우
Java(Spring) 코드 예제
메시지 발행은 opsForStream().add()로 수행합니다.
@Service
public class OrderStreamService {
private final StringRedisTemplate redisTemplate;
public RecordId publishOrder(String product, int quantity) {
Map<String, String> fields = Map.of(
"product", product,
"quantity", String.valueOf(quantity)
);
StringRecord record = StreamRecords.string(fields)
.withStreamKey("orders");
return redisTemplate.opsForStream().add(record);
}
}
Consumer Group 리스너는 StreamListener 인터페이스를 구현합니다. ACK는 StreamMessageListenerContainer가 자동으로 처리합니다.
@Component
public class OrderStreamListener
implements StreamListener<String, MapRecord<String, String, String>> {
@Override
public void onMessage(MapRecord<String, String, String> message) {
Map<String, String> data = message.getValue();
System.out.println("주문 처리: " + data.get("product"));
}
}
함정/Pitfall
1. XACK를 빠뜨리면 PEL이 무한히 쌓인다
메시지를 소비한 후 XACK를 호출하지 않으면 PEL(Pending Entries List)에 메시지가 계속 남습니다. PEL이 커지면 메모리를 차지하고, XPENDING 조회 비용도 증가합니다. 반드시 처리 완료 후 XACK를 호출해야 합니다.
2. XTRIM 없이 Stream을 방치하면 메모리가 계속 증가한다
Stream은 append-only 구조이므로, 명시적으로 XTRIM이나 MAXLEN을 지정하지 않으면 메시지가 영원히 누적됩니다. XADD 시 MAXLEN ~ N 옵션을 함께 사용하는 것이 좋습니다.
3. Consumer가 죽으면 할당된 메시지가 다른 Consumer에게 자동으로 넘어가지 않는다
Consumer Group에서 특정 Consumer에게 전달된 메시지는 해당 Consumer의 PEL에 남습니다. 다른 Consumer가 이 메시지를 처리하려면 XCLAIM 이나 XAUTOCLAIM 을 명시적으로 호출해야 합니다.
정리
| 항목 | 핵심 내용 |
|---|---|
| 핵심 가치 | Pub/Sub의 메시지 유실 + List의 단일 소비자 한계를 모두 해결 |
| Consumer Group | 같은 Stream의 메시지를 여러 Consumer가 분산 처리 |
| ACK/PEL | XACK로 처리 완료 알림, 미처리 메시지는 PEL에 보관 |
| 재처리 | XCLAIM/XAUTOCLAIM으로 장애 Consumer의 메시지 재할당 |
| 적합 범위 | 중간 규모 메시징, Kafka보다 간단. 대규모면 Kafka 고려 |