R2DBC 개념은 알겠는데, 실제로 쿼리를 작성하고 트랜잭션을 관리하려면 어디서부터 시작해야 할까요?

실전 셋업부터 시작하기

Spring Boot에서 R2DBC를 사용하려면 의존성 두 개만 추가하면 됩니다.

GRADLE
// build.gradle
dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-data-r2dbc'
    // 사용할 DB 드라이버 — 여기서는 PostgreSQL
    runtimeOnly 'org.postgresql:r2dbc-postgresql'
    // 커넥션 풀 (Spring Boot가 자동 감지)
    runtimeOnly 'io.r2dbc:r2dbc-pool'
}

application.yml 설정은 JDBC와 비슷하지만, URL 스킴이 r2dbc:로 시작합니다.

YAML
spring:
  r2dbc:
    url: r2dbc:postgresql://localhost:5432/mydb
    username: myuser
    password: mypassword

Spring Boot의 자동 설정이 ConnectionFactory, DatabaseClient, R2dbcTransactionManager를 모두 빈으로 등록해 줍니다. 별도 @Configuration을 만들 일이 거의 없습니다.

엔티티 정의

R2DBC 엔티티는 JPA와 달리 아주 단순합니다. @Entity 대신 @Table을 사용하고, 연관관계 매핑이 없습니다.

JAVA
import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Table;

@Table("orders")
public class Order {

    @Id
    private Long id;
    private Long userId;
    private String productName;
    private int quantity;
    private int totalPrice;

    // 기본 생성자 + getter/setter 생략
}

JPA에 익숙하다면 @Column, @OneToMany 같은 어노테이션이 없어서 허전할 수 있습니다. R2DBC는 ORM이 아니라 SQL 매퍼에 가깝다고 생각하면 편합니다.

ReactiveCrudRepository — 간단한 CRUD

JPA의 CrudRepository처럼 인터페이스만 정의하면 구현체가 자동 생성됩니다.

JAVA
public interface OrderRepository extends ReactiveCrudRepository<Order, Long> {

    // 메서드 이름 기반 쿼리 — Spring Data가 자동으로 SQL 생성
    Flux<Order> findByUserId(Long userId);

    // @Query로 직접 SQL 작성도 가능
    @Query("SELECT * FROM orders WHERE product_name = :name AND quantity > :minQty")
    Flux<Order> findByProductAndMinQuantity(String name, int minQty);
}

사용법은 JPA Repository와 거의 동일합니다. findById()Mono, findAll()Flux를 반환한다는 점만 다릅니다. 간단한 CRUD는 Repository만으로 충분하지만, 실무에서는 조인이나 동적 조건 같은 복잡한 쿼리가 필요할 때가 많습니다.

DatabaseClient — 진짜 실전은 여기부터

DatabaseClientJdbcTemplate의 리액티브 버전입니다. SQL을 직접 작성하고, 결과를 수동으로 매핑합니다.

JAVA
@Repository
@RequiredArgsConstructor
public class OrderCustomRepository {

    private final DatabaseClient databaseClient;

    // 여러 건 조회 — .all()은 Flux로 반환
    public Flux<Order> findExpensiveOrders(int minPrice) {
        return databaseClient.sql("""
                SELECT id, user_id, product_name, quantity, total_price
                FROM orders
                WHERE total_price >= :minPrice
                ORDER BY total_price DESC
                """)
                .bind("minPrice", minPrice)
                .map(row -> new Order(
                        row.get("id", Long.class),
                        row.get("user_id", Long.class),
                        row.get("product_name", String.class),
                        row.get("quantity", Integer.class),
                        row.get("total_price", Integer.class)
                ))
                .all();
    }
}

결과 반환 메서드는 세 가지가 있습니다.

  • .all()Flux로 여러 건 반환
  • .one()Mono로 단건 반환 (결과가 없으면 빈 Mono, 2건 이상이면 에러)
  • .first()Mono로 첫 번째 건만 반환

Row 매핑을 깔끔하게 분리하기

매번 row.get(...)을 반복하면 코드가 지저분해집니다. 매핑 로직을 분리하는 게 좋습니다.

JAVA
// 매핑 함수를 상수로 분리
private static final BiFunction<Row, RowMetadata, Order> ORDER_MAPPER =
        (row, meta) -> new Order(
                row.get("id", Long.class),
                row.get("user_id", Long.class),
                row.get("product_name", String.class),
                row.get("quantity", Integer.class),
                row.get("total_price", Integer.class)
        );

// 사용할 때 — 훨씬 깔끔
public Flux<Order> findExpensiveOrders(int minPrice) {
    return databaseClient.sql("SELECT * FROM orders WHERE total_price >= :minPrice")
            .bind("minPrice", minPrice)
            .map(ORDER_MAPPER)
            .all();
}

조인과 집계 — 복잡한 쿼리 패턴

R2DBC는 JPA처럼 연관관계를 자동으로 매핑해 주지 않습니다. 조인 쿼리는 직접 SQL을 작성하고 DTO로 매핑해야 합니다.

JAVA
// 주문 + 사용자 정보를 함께 조회하는 DTO
public record OrderWithUser(Long orderId, String productName,
                            String userName, String email) {}

public Flux<OrderWithUser> findOrdersWithUser() {
    return databaseClient.sql("""
            SELECT o.id AS order_id, o.product_name,
                   u.name AS user_name, u.email
            FROM orders o
            JOIN users u ON o.user_id = u.id
            ORDER BY o.id DESC
            """)
            .map(row -> new OrderWithUser(
                    row.get("order_id", Long.class),
                    row.get("product_name", String.class),
                    row.get("user_name", String.class),
                    row.get("email", String.class)
            ))
            .all();
}

// 집계 쿼리도 동일한 패턴
public Mono<Long> countByUserId(Long userId) {
    return databaseClient.sql("SELECT COUNT(*) AS cnt FROM orders WHERE user_id = :userId")
            .bind("userId", userId)
            .map(row -> row.get("cnt", Long.class))
            .one();
}

연관관계 매핑이 없다는 건 단점이지만, 반대로 생각하면 어떤 SQL이 실행되는지 100% 제어할 수 있다는 뜻이기도 합니다. N+1 문제 같은 것도 원천적으로 발생하지 않습니다.

트랜잭션 관리

선언적 방식 — @Transactional

가장 간단한 방법입니다. JPA에서 쓰던 것과 동일합니다.

JAVA
@Service
@RequiredArgsConstructor
public class OrderService {

    private final OrderRepository orderRepository;
    private final PointRepository pointRepository;

    @Transactional // ReactiveTransactionManager가 자동으로 관리
    public Mono<Order> placeOrder(Order order, int pointsToDeduct) {
        return orderRepository.save(order)
                .flatMap(savedOrder ->
                        // 포인트 차감 — 실패하면 주문도 롤백
                        pointRepository.deductPoints(order.getUserId(), pointsToDeduct)
                                .thenReturn(savedOrder)
                );
    }
}

프로그래밍 방식 — TransactionalOperator

@Transactional을 쓸 수 없는 상황(예: 동적으로 트랜잭션 범위를 결정해야 할 때)에는 TransactionalOperator를 사용합니다.

JAVA
@RequiredArgsConstructor
public class OrderService {

    private final DatabaseClient databaseClient;
    private final TransactionalOperator txOperator;

    public Mono<Void> transferOrder(Long fromUserId, Long toUserId, Long orderId) {
        return databaseClient.sql(
                        "UPDATE orders SET user_id = :to WHERE id = :id AND user_id = :from")
                .bind("to", toUserId)
                .bind("id", orderId)
                .bind("from", fromUserId)
                .fetch().rowsUpdated()
                .flatMap(count -> count == 0
                        ? Mono.error(new IllegalStateException("주문을 찾을 수 없습니다"))
                        : Mono.<Void>empty())
                .as(txOperator::transactional); // 트랜잭션으로 감싸기
    }
}

리액티브 환경에서 @Transactional은 내부적으로 Reactor Context를 통해 트랜잭션 정보를 전파합니다. ThreadLocal 기반인 기존 방식과 다르기 때문에, subscribeOn으로 스레드를 전환해도 트랜잭션이 유지됩니다.

커넥션 풀 설정

r2dbc-pool이 클래스패스에 있으면 Spring Boot가 자동으로 커넥션 풀을 구성합니다.

YAML
spring:
  r2dbc:
    url: r2dbc:postgresql://localhost:5432/mydb
    username: myuser
    password: mypassword
    pool:
      initial-size: 5       # 시작 시 생성할 커넥션 수
      max-size: 20           # 최대 커넥션 수
      max-idle-time: 30m     # 유휴 커넥션 최대 유지 시간
      max-life-time: 60m     # 커넥션 최대 수명
      validation-query: "SELECT 1"  # 커넥션 유효성 검사 쿼리

커넥션 풀 크기를 정할 때 고려할 점이 있습니다.

  • 리액티브 환경은 스레드 수가 적으므로(보통 CPU 코어 수만큼), JDBC보다 커넥션 풀을 작게 잡아도 됩니다
  • max-size를 너무 크게 잡으면 DB 서버에 부하가 갑니다
  • 일반적으로 이벤트 루프 스레드 수 × 2~3 정도가 적절합니다

커넥션 풀이 가득 찬 상태에서 새 요청이 들어오면, 리액티브 방식에서는 스레드를 블로킹하지 않고 커넥션이 반환될 때까지 대기 큐에 넣습니다. 이 점이 JDBC 커넥션 풀과의 핵심 차이입니다.

에러 핸들링

리액티브 DB 연산에서 에러가 발생하면 onError* 연산자로 처리합니다.

JAVA
public Mono<Order> getOrderOrDefault(Long id) {
    return orderRepository.findById(id)
            // 결과가 없으면 기본값 반환
            .switchIfEmpty(Mono.error(new OrderNotFoundException(id)))
            // 특정 예외를 다른 예외로 변환
            .onErrorMap(DataAccessException.class,
                    ex -> new ServiceException("DB 조회 실패", ex))
            // 에러 발생 시 폴백
            .onErrorResume(OrderNotFoundException.class,
                    ex -> Mono.just(Order.empty()));
}

// INSERT 시 유니크 제약 조건 위반 처리
public Mono<Order> createOrder(Order order) {
    return orderRepository.save(order)
            .onErrorMap(DuplicateKeyException.class,
                    ex -> new DuplicateOrderException("중복 주문입니다"));
}

주의할 점이 하나 있습니다.

JAVA
// 이렇게 하면 안 됩니다 — 리액티브 체인 밖에서 try-catch
public Mono<Order> badExample(Long id) {
    try {
        return orderRepository.findById(id); // 에러는 구독 시점에 발생
    } catch (Exception e) {
        // 여기서 잡히지 않습니다!
        return Mono.empty();
    }
}

리액티브 프로그래밍에서 에러는 구독 시점에 발생합니다. 전통적인 try-catch가 아닌 리액티브 연산자(onErrorMap, onErrorResume, onErrorReturn)로 처리해야 합니다.

테스트 — @DataR2dbcTest

@DataR2dbcTest는 R2DBC 관련 빈만 로드하는 슬라이스 테스트 어노테이션입니다.

JAVA
@DataR2dbcTest
class OrderRepositoryTest {

    @Autowired
    private OrderRepository orderRepository;

    @Autowired
    private DatabaseClient databaseClient;

    @BeforeEach
    void setUp() {
        // 테스트 데이터 초기화
        databaseClient.sql("DELETE FROM orders").fetch().rowsUpdated().block();
    }

    @Test
    void 사용자_주문을_조회한다() {
        // given — 테스트 데이터 삽입
        Order order = new Order(null, 1L, "키보드", 2, 150000);
        orderRepository.save(order).block();

        // when & then — StepVerifier로 리액티브 스트림 검증
        StepVerifier.create(orderRepository.findByUserId(1L))
                .assertNext(found -> {
                    assertThat(found.getProductName()).isEqualTo("키보드");
                    assertThat(found.getQuantity()).isEqualTo(2);
                })
                .verifyComplete();
    }

    @Test
    void 존재하지_않는_주문을_조회하면_빈_Mono를_반환한다() {
        StepVerifier.create(orderRepository.findById(999L))
                .verifyComplete(); // 원소 없이 완료
    }
}

실제 DB로 테스트하려면 Testcontainers를 함께 사용합니다. @DynamicPropertySource로 R2DBC URL을 컨테이너 주소로 덮어쓰면 됩니다.

실전에서 기억할 것들

정리하면 이렇습니다.

  • 간단한 CRUD: ReactiveCrudRepository로 충분합니다
  • ** 복잡한 쿼리 **: DatabaseClient로 SQL을 직접 작성합니다
  • ** 트랜잭션 **: @Transactional이 기본, 동적 제어가 필요하면 TransactionalOperator
  • ** 커넥션 풀 **: 리액티브 환경에서는 JDBC보다 작게 잡아도 됩니다
  • ** 에러 처리 **: try-catch가 아닌 리액티브 연산자를 사용합니다
  • ** 테스트 **: @DataR2dbcTest + StepVerifier가 기본 조합입니다

R2DBC는 JPA에 비하면 수동으로 해야 할 일이 많지만, 그만큼 실행되는 SQL을 정확히 제어할 수 있고 리액티브 스택의 이점을 끝까지 살릴 수 있습니다. 처음에는 ReactiveCrudRepository로 시작하고, 복잡한 쿼리가 필요해지면 DatabaseClient를 점진적으로 도입하는 전략을 추천합니다.

댓글 로딩 중...