스케줄링 알고리즘 — 작업 큐와 스레드 풀의 전략
작업이 100개인데 스레드가 4개뿐입니다. 어떤 순서로, 어떤 스레드에 작업을 배정해야 할까요?
개념 정의
스케줄링 알고리즘 은 제한된 자원(스레드, CPU, 서버)에 작업을 배정하는 전략입니다. OS의 CPU 스케줄링에서 출발했지만, 백엔드에서는 스레드 풀, 작업 큐, 메시지 컨슈머 등 곳곳에서 사용됩니다.
왜 필요한가
- **스레드 풀 관리 **: ThreadPoolExecutor의 큐 정책 선택
- ** 비동기 작업 **: Spring @Async의 실행 전략
- ** 메시지 처리 **: Kafka Consumer의 파티션 할당
- ** 배치 처리 **: 대량 작업의 효율적 분배
FIFO (First In, First Out)
가장 기본적인 전략입니다. 먼저 들어온 작업을 먼저 처리합니다.
// LinkedBlockingQueue = FIFO
ExecutorService executor = new ThreadPoolExecutor(
4, // corePoolSize
4, // maximumPoolSize
0L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100) // FIFO 큐
);
executor.submit(() -> processTask("task-1")); // 먼저 실행
executor.submit(() -> processTask("task-2")); // 다음 실행
- ** 장점 **: 공정, 구현 간단, 기아(starvation) 없음
- ** 단점 **: 긴급 작업도 순서를 기다려야 함
Priority Queue
우선순위가 높은 작업을 먼저 처리합니다.
// 우선순위 기반 스레드 풀
ExecutorService executor = new ThreadPoolExecutor(
4, 4, 0L, TimeUnit.SECONDS,
new PriorityBlockingQueue<>(100)
);
// 우선순위를 가진 작업
public class PriorityTask implements Runnable, Comparable<PriorityTask> {
private final int priority;
private final Runnable task;
public PriorityTask(int priority, Runnable task) {
this.priority = priority;
this.task = task;
}
@Override
public int compareTo(PriorityTask other) {
return Integer.compare(this.priority, other.priority); // 낮은 숫자 = 높은 우선순위
}
@Override
public void run() { task.run(); }
}
// 긴급 작업은 우선순위 1, 일반 작업은 우선순위 10
executor.execute(new PriorityTask(10, () -> normalProcess()));
executor.execute(new PriorityTask(1, () -> urgentProcess())); // 먼저 실행!
- ** 장점 **: 중요한 작업 우선 처리
- ** 단점 **: 낮은 우선순위 작업의 기아(starvation) 가능
Work Stealing
Java의 ForkJoinPool 이 사용하는 전략입니다. 각 스레드가 자신만의 작업 큐(deque)를 가지고, 자신의 큐가 비면 **다른 스레드의 큐에서 작업을 훔쳐옵니다 **.
// ForkJoinPool — Work Stealing
ForkJoinPool pool = new ForkJoinPool(4); // 4개 스레드
// RecursiveTask 예제 — 배열 합 계산
public class SumTask extends RecursiveTask<Long> {
private final int[] array;
private final int start, end;
private static final int THRESHOLD = 1000;
public SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
// 작은 작업은 직접 계산
long sum = 0;
for (int i = start; i < end; i++) sum += array[i];
return sum;
}
int mid = (start + end) / 2;
SumTask left = new SumTask(array, start, mid);
SumTask right = new SumTask(array, mid, end);
left.fork(); // 왼쪽을 다른 스레드에 맡김
long rightResult = right.compute(); // 오른쪽은 현재 스레드에서 계산
long leftResult = left.join(); // 왼쪽 결과 대기
return leftResult + rightResult;
}
}
Long result = pool.invoke(new SumTask(array, 0, array.length));
Work Stealing의 동작
스레드0: [작업A, 작업B, 작업C] ← 자신의 deque 앞에서 꺼냄
스레드1: [작업D]
스레드2: [] ← 비어있음!
스레드3: [작업E, 작업F]
스레드2가 유휴 → 스레드0의 deque 뒤에서 작업C를 훔쳐옴
앞에서 꺼내고 뒤에서 훔치므로 경합이 최소화됩니다.
ThreadPoolExecutor 동작 원리
Java의 ThreadPoolExecutor는 다음 순서로 작업을 처리합니다.
1. 스레드 수 < corePoolSize → 새 스레드 생성
2. 스레드 수 >= corePoolSize → 큐에 추가
3. 큐가 가득 참 → maximumPoolSize까지 스레드 추가
4. 스레드 수 == maximumPoolSize && 큐도 가득 참 → 거부 정책 실행
거부 정책
// 1. AbortPolicy (기본): RejectedExecutionException 던짐
new ThreadPoolExecutor.AbortPolicy()
// 2. CallerRunsPolicy: 호출 스레드가 직접 실행 (백프레셔 효과)
new ThreadPoolExecutor.CallerRunsPolicy()
// 3. DiscardPolicy: 조용히 버림
new ThreadPoolExecutor.DiscardPolicy()
// 4. DiscardOldestPolicy: 큐의 가장 오래된 작업을 버리고 새 작업 추가
new ThreadPoolExecutor.DiscardOldestPolicy()
실무에서는 CallerRunsPolicy 가 자주 추천됩니다. 호출자가 직접 실행하므로 자연스럽게 요청 속도가 조절됩니다.
Spring @Async
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean("taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("async-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
@Service
public class NotificationService {
@Async("taskExecutor")
public CompletableFuture<Void> sendEmailAsync(String to, String content) {
// 비동기 실행
emailSender.send(to, content);
return CompletableFuture.completedFuture(null);
}
}
@Async 주의점
// 1. 같은 클래스 내 호출은 프록시를 거치지 않아 비동기가 안 됨
@Service
public class MyService {
public void method1() {
method2(); // 비동기 안 됨! (프록시 우회)
}
@Async
public void method2() { /* ... */ }
}
// 2. 반환 타입이 void면 예외가 전파되지 않음
// → AsyncUncaughtExceptionHandler 설정 필요
Kafka Consumer 파티션 할당
Kafka에서 파티션을 컨슈머에 할당하는 것도 스케줄링입니다.
// Kafka 파티션 할당 전략
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.RangeAssignor");
| 전략 | 설명 |
|---|---|
| RangeAssignor | 토픽별로 파티션을 연속 범위로 할당 |
| RoundRobinAssignor | 모든 파티션을 라운드 로빈으로 할당 |
| StickyAssignor | 기존 할당을 최대한 유지하며 균형 맞춤 |
| CooperativeStickyAssignor | 점진적 리밸런싱 지원 |
알고리즘 비교
| 알고리즘 | 공정성 | 처리량 | 복잡도 | 사용처 |
|---|---|---|---|---|
| FIFO | 높음 | 보통 | O(1) | 기본 스레드 풀 |
| Priority | 낮음 | 높음 | O(log N) | 긴급 작업 처리 |
| Work Stealing | 높음 | 높음 | O(1) 평균 | ForkJoinPool |
| Round Robin | 높음 | 보통 | O(1) | 파티션 할당 |
주의할 점
ThreadPoolExecutor의 큐 정책을 이해하지 않고 설정하는 실수
LinkedBlockingQueue(무제한)를 사용하면 maximumPoolSize가 의미 없어집니다. 큐가 무한히 쌓이고 스레드는 corePoolSize만 유지되기 때문입니다. 큐 크기를 제한하거나 SynchronousQueue를 사용하는 것이 안전합니다.
Spring @Async에서 void 반환 시 예외가 조용히 사라지는 문제
void 반환의 @Async 메서드에서 예외가 발생하면 호출자에게 전파되지 않고 로그만 남습니다. AsyncUncaughtExceptionHandler를 반드시 설정해야 예외를 놓치지 않습니다.
정리
- FIFO: 가장 기본적인 전략. LinkedBlockingQueue + ThreadPoolExecutor로 구현합니다
- Priority: 긴급 작업 우선 처리. 기아 방지를 위해 에이징이 필요할 수 있습니다
- Work Stealing: 유휴 스레드가 바쁜 스레드의 작업을 가져가는 전략. ForkJoinPool이 사용합니다
- ThreadPoolExecutor 의 core/max/queue 설정이 작업 흐름을 결정합니다
- Spring @Async 는 내부적으로 스레드 풀 스케줄링을 사용하며, 설정 없이 쓰면 위험할 수 있습니다
- Kafka Consumer의 파티션 할당도 스케줄링 알고리즘의 일종입니다
댓글 로딩 중...