Spring Boot + RabbitMQ 기본 연동
지금까지 RabbitMQ의 개념과 동작 원리를 살펴봤는데, 실제로 Spring Boot 프로젝트에서 메시지를 보내고 받으려면 어디서부터 시작해야 할까?
Spring Boot + RabbitMQ 기본 연동
spring-boot-starter-amqp 시작하기
Spring Boot에서 RabbitMQ를 사용하려면 spring-boot-starter-amqp 의존성 하나만 추가하면 됩니다. 이 스타터가 Spring AMQP와 RabbitMQ 클라이언트 라이브러리를 함께 가져오고, 자동 설정(Auto Configuration)까지 처리해줍니다.
의존성 추가
// build.gradle
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-amqp'
}
application.yml 설정
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
이 설정만으로 Spring Boot가 ConnectionFactory, RabbitTemplate, RabbitAdmin 빈을 자동으로 생성합니다. 별도의 @Bean 선언 없이도 바로 메시지를 발행할 수 있는 상태가 되는 것입니다.
| 자동 생성 빈 | 역할 |
|---|---|
CachingConnectionFactory | RabbitMQ 서버와의 커넥션 관리 (커넥션 풀링) |
RabbitTemplate | 메시지 발행용 템플릿 |
RabbitAdmin | Exchange, Queue, Binding 등 인프라 자동 선언 |
SimpleRabbitListenerContainerFactory | @RabbitListener 컨테이너 생성 |
Exchange, Queue, Binding 빈 선언
메시지를 주고받으려면 먼저 Exchange, Queue, 그리고 이 둘을 연결하는 Binding이 필요합니다. 이전 글에서 살펴본 Exchange 타입들을 Spring Bean으로 선언해봅시다.
@Configuration
public class RabbitConfig {
// Exchange 선언
@Bean
public DirectExchange orderExchange() {
return new DirectExchange("order.exchange");
}
// Queue 선언
@Bean
public Queue orderQueue() {
// durable=true → 브로커 재시작 후에도 큐 유지
return QueueBuilder.durable("order.queue").build();
}
// Binding: Exchange → Queue를 라우팅 키로 연결
@Bean
public Binding orderBinding(Queue orderQueue, DirectExchange orderExchange) {
return BindingBuilder
.bind(orderQueue)
.to(orderExchange)
.with("order.created"); // 라우팅 키
}
}
RabbitAdmin이 애플리케이션 시작 시점에 이 빈들을 감지하여 RabbitMQ 서버에 자동으로 선언합니다. 이미 존재하는 Exchange나 Queue라면 동일한 설정인지 확인하고, 다르면 예외를 발생시킵니다.
Topic Exchange 예시
@Bean
public TopicExchange logExchange() {
return new TopicExchange("log.exchange");
}
@Bean
public Queue errorLogQueue() {
return QueueBuilder.durable("log.error.queue").build();
}
@Bean
public Binding errorLogBinding(Queue errorLogQueue, TopicExchange logExchange) {
// "log.error.*" 패턴에 매칭되는 메시지만 수신
return BindingBuilder
.bind(errorLogQueue)
.to(logExchange)
.with("log.error.*");
}
RabbitTemplate — 메시지 발행
RabbitTemplate은 RabbitMQ에 메시지를 발행하는 핵심 클래스입니다. Spring의 JdbcTemplate이나 RestTemplate과 같은 패턴으로, 반복적인 커넥션 관리와 채널 생성을 추상화해줍니다.
기본 발행
@Service
@RequiredArgsConstructor
public class OrderProducer {
private final RabbitTemplate rabbitTemplate;
public void sendOrder(OrderEvent event) {
// exchange, routingKey, 메시지 객체
rabbitTemplate.convertAndSend(
"order.exchange",
"order.created",
event
);
}
}
convertAndSend() vs send()
| 메서드 | 설명 |
|---|---|
convertAndSend(exchange, routingKey, object) | MessageConverter를 통해 Java 객체를 Message로 자동 변환 후 발행 |
send(exchange, routingKey, message) | 이미 생성된 Message 객체를 직접 전달 |
대부분의 경우 convertAndSend()를 사용합니다. 객체를 직접 넘기면 등록된 MessageConverter가 알아서 직렬화해주기 때문입니다.
메시지 후처리 (MessagePostProcessor)
헤더 추가, 우선순위 지정 등 메시지를 보내기 직전에 가공이 필요하면 MessagePostProcessor를 활용합니다.
rabbitTemplate.convertAndSend("order.exchange", "order.created", event,
message -> {
// 커스텀 헤더 추가
message.getMessageProperties().setHeader("x-retry-count", 0);
// 메시지 우선순위 설정
message.getMessageProperties().setPriority(5);
return message;
});
@RabbitListener — 메시지 소비
메시지를 소비하는 쪽은 @RabbitListener 어노테이션 하나로 구현할 수 있습니다.
메서드 레벨 리스너
@Service
@Slf4j
public class OrderConsumer {
@RabbitListener(queues = "order.queue")
public void handleOrder(OrderEvent event) {
log.info("주문 이벤트 수신: {}", event.getOrderId());
// 비즈니스 로직 처리
}
}
이 메서드가 정상적으로 리턴하면 자동으로 ACK가 전송됩니다. 예외가 발생하면 메시지가 NACK 처리되어 재전달됩니다.
@RabbitListener는 내부적으로SimpleMessageListenerContainer를 생성합니다. 이 컨테이너가 별도의 스레드에서 RabbitMQ 커넥션을 유지하며 메시지를 폴링하는 구조입니다. 컨트롤러처럼 HTTP 요청을 기다리는 것이 아니라, 컨테이너가 능동적으로 큐를 구독하고 있다는 점이 핵심입니다.
클래스 레벨 리스너 + @RabbitHandler
하나의 큐에서 여러 타입의 메시지를 받아야 한다면 클래스 레벨에 @RabbitListener를 붙이고, 메서드별로 @RabbitHandler를 사용합니다.
@Service
@RabbitListener(queues = "order.queue")
public class OrderMessageHandler {
@RabbitHandler
public void handleCreated(OrderCreatedEvent event) {
// 주문 생성 이벤트 처리
}
@RabbitHandler
public void handleCancelled(OrderCancelledEvent event) {
// 주문 취소 이벤트 처리
}
}
이 패턴은 메시지의 __TypeId__ 헤더를 기반으로 적절한 핸들러 메서드를 선택합니다. JSON 직렬화 설정이 제대로 되어 있어야 타입 판별이 가능합니다.
Jackson2JsonMessageConverter — JSON 직렬화
왜 Java 직렬화 대신 JSON을 써야 하는가
Spring AMQP의 기본 MessageConverter는 SimpleMessageConverter로, Java의 Serializable 인터페이스를 이용한 직렬화를 사용합니다.
Java 기본 직렬화는 클래스의 패키지 경로, 필드 구조,
serialVersionUID가 정확히 일치해야 역직렬화가 가능합니다. 프로듀서와 컨슈머가 서로 다른 서비스라면, 양쪽 모두 동일한 클래스를 공유해야 한다는 뜻입니다. 서비스 간 결합도가 높아지고, Python이나 Node.js 같은 다른 언어로 작성된 컨슈머는 아예 메시지를 읽을 수 없습니다.
JSON 직렬화의 장점을 정리하면 다음과 같습니다.
| 비교 항목 | Java 직렬화 | JSON 직렬화 |
|---|---|---|
| 언어 독립성 | Java만 가능 | 모든 언어에서 파싱 가능 |
| 클래스 결합도 | 패키지 경로까지 일치해야 함 | 필드명만 맞으면 역직렬화 가능 |
| 스키마 변경 | 필드 추가/삭제 시 역직렬화 실패 위험 | @JsonIgnoreProperties로 유연하게 대응 |
| 가독성 | 바이너리 → 디버깅 어려움 | 사람이 읽을 수 있음 |
| 보안 | 역직렬화 공격(Deserialization Attack) 취약 | 상대적으로 안전 |
Jackson2JsonMessageConverter 설정
@Configuration
public class RabbitConfig {
@Bean
public Jackson2JsonMessageConverter messageConverter() {
ObjectMapper objectMapper = new ObjectMapper();
// Java 8 날짜/시간 타입 지원
objectMapper.registerModule(new JavaTimeModule());
// 알 수 없는 필드 무시 (스키마 변경에 유연하게 대응)
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return new Jackson2JsonMessageConverter(objectMapper);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
Jackson2JsonMessageConverter converter) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(converter);
return template;
}
}
이렇게 설정하면 convertAndSend()로 보낸 객체가 자동으로 JSON으로 직렬화되고, @RabbitListener에서 받을 때도 JSON에서 Java 객체로 자동 역직렬화됩니다.
컨슈머 쪽 MessageConverter 설정
@RabbitListener가 사용하는 컨테이너 팩토리에도 동일한 컨버터를 설정해야 합니다.
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory,
Jackson2JsonMessageConverter converter) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(converter);
return factory;
}
수동 ACK 전환
이전 ACK/NACK 글에서 Auto-ack와 Manual-ack의 차이를 다뤘었습니다. Spring Boot에서는 기본적으로 AUTO 모드가 적용됩니다. 이 모드는 리스너 메서드가 정상 리턴하면 ACK, 예외가 발생하면 NACK을 자동으로 보냅니다.
하지만 실무에서는 수동 ACK가 필요한 경우가 자주 있습니다.
acknowledge-mode 비교
| 모드 | 동작 | 사용 시점 |
|---|---|---|
NONE | 메시지 수신 즉시 ACK (RabbitMQ의 auto-ack) | 로그 등 유실 허용 가능한 데이터 |
AUTO (기본값) | 리스너 정상 리턴 → ACK, 예외 → NACK | 대부분의 일반적인 경우 |
MANUAL | 개발자가 직접 channel.basicAck() 호출 | 세밀한 제어가 필요한 경우 |
수동 ACK 설정
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
수동 ACK 코드
@Service
@Slf4j
public class OrderConsumer {
@RabbitListener(queues = "order.queue")
public void handleOrder(OrderEvent event,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
// 비즈니스 로직 처리
processOrder(event);
// 처리 성공 → ACK
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("주문 처리 실패: {}", event.getOrderId(), e);
// 처리 실패 → NACK (requeue=false → DLQ로 이동)
channel.basicNack(deliveryTag, false, false);
}
}
}
수동 ACK를 쓰면서
channel.basicAck()을 호출하지 않으면, 메시지가 Unacked 상태로 계속 남아 있습니다. Unacked 메시지가prefetchCount만큼 쌓이면 RabbitMQ는 해당 컨슈머에게 더 이상 메시지를 보내지 않습니다. 메시지가 큐에 쌓이기만 하고 소비되지 않는 현상이 발생하면 ACK 누락을 의심해보세요.
prefetchCount 설정
한 번에 가져올 미확인 메시지 수를 제한하여 컨슈머의 부하를 조절할 수 있습니다.
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
prefetch: 10 # 한 번에 최대 10개의 미확인 메시지
전체 설정 정리
지금까지의 내용을 하나의 설정 클래스로 정리합니다.
@Configuration
public class RabbitConfig {
// --- MessageConverter ---
@Bean
public Jackson2JsonMessageConverter messageConverter() {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return new Jackson2JsonMessageConverter(mapper);
}
// --- RabbitTemplate ---
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory cf,
Jackson2JsonMessageConverter converter) {
RabbitTemplate tpl = new RabbitTemplate(cf);
tpl.setMessageConverter(converter);
return tpl;
}
// --- Listener Container Factory ---
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory cf,
Jackson2JsonMessageConverter converter) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf);
factory.setMessageConverter(converter);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setPrefetchCount(10);
return factory;
}
// --- 인프라 선언 ---
@Bean
public DirectExchange orderExchange() {
return new DirectExchange("order.exchange");
}
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order.queue").build();
}
@Bean
public Binding orderBinding(Queue orderQueue, DirectExchange orderExchange) {
return BindingBuilder.bind(orderQueue).to(orderExchange).with("order.created");
}
}
주의할 점
-
**MessageConverter 불일치 **: 프로듀서와 컨슈머에서 같은
MessageConverter를 사용해야 합니다. 한쪽은 JSON, 한쪽은 Java 직렬화를 쓰면 역직렬화에 실패합니다. -
**Queue 설정 충돌 **: 이미 RabbitMQ에 존재하는 Queue와 다른 속성(durable, arguments 등)으로 같은 이름의 Queue를 선언하면
PRECONDITION_FAILED에러가 발생합니다. 기존 Queue를 삭제하거나 동일한 설정으로 맞춰야 합니다. -
**ACK 누락 **: 수동 ACK 모드에서 예외 처리 경로를 빠짐없이 처리하지 않으면 메시지가 Unacked 상태로 남아 큐가 막힐 수 있습니다.
try-catch에서 모든 경로에 ACK 또는 NACK을 보내야 합니다. -
** 커넥션 관리 **:
CachingConnectionFactory는 기본적으로 하나의 커넥션을 캐싱합니다. 프로듀서와 컨슈머가 동일 애플리케이션에 있을 때, 부하가 높으면publisherConnectionFactory를 별도로 분리하는 것이 좋습니다.
정리
| 구성 요소 | Spring Boot에서의 구현 |
|---|---|
| 의존성 | spring-boot-starter-amqp |
| 접속 설정 | application.yml의 spring.rabbitmq.* |
| 메시지 발행 | RabbitTemplate.convertAndSend() |
| 메시지 소비 | @RabbitListener + SimpleMessageListenerContainer |
| 직렬화 | Jackson2JsonMessageConverter 빈 등록 |
| 수동 ACK | acknowledge-mode: manual + Channel.basicAck() |
| 인프라 선언 | Exchange, Queue, Binding 빈 → RabbitAdmin 자동 등록 |
이번 글에서는 Spring Boot에서 RabbitMQ를 연동하는 기본기를 다뤘습니다. 의존성 추가부터 메시지 발행, 소비, JSON 직렬화, 수동 ACK까지 실무에서 바로 쓸 수 있는 설정을 정리했습니다.
다음 글에서는 메시지 처리 중 에러가 발생했을 때의 에러 핸들링과 재시도 전략 — RetryTemplate, ErrorHandler, DLQ를 활용한 실패 메시지 관리를 다뤄보겠습니다.