코루틴 심화 — Flow, Channel, 구조화된 동시성
코루틴 기초를 넘어서면 Flow, Channel, 구조화된 동시성 같은 개념이 등장합니다. 특히 Flow는 안드로이드에서 LiveData를 대체하는 추세라 이해해야 할 시점입니다.
Flow — Cold Stream의 의미
Flow는 비동기적으로 여러 값을 순차적으로 방출하는 Cold Stream 입니다.
"Cold"라는 건 collect()를 호출하기 전까지 아무것도 실행하지 않는다 는 뜻입니다. Sequence의 비동기 버전이라고 생각하면 이해가 빠릅니다.
// Flow 생성 — 이 시점에서는 아무것도 실행되지 않음
fun numbersFlow(): Flow<Int> = flow {
println("Flow 시작")
for (i in 1..3) {
delay(100)
emit(i) // 값을 방출
}
}
// collect()를 호출해야 비로소 실행됨
suspend fun main() {
val flow = numbersFlow()
println("collect 호출 전")
flow.collect { value ->
println("수신: $value")
}
}
// 출력:
// collect 호출 전
// Flow 시작
// 수신: 1
// 수신: 2
// 수신: 3
Flow 연산자
Flow는 컬렉션처럼 다양한 중간 연산자를 제공합니다. 모두 지연 평가(lazy) 입니다.
flowOf(1, 2, 3, 4, 5)
.filter { it % 2 == 0 }
.map { it * 10 }
.collect { println(it) } // 20, 40
// 실무에서 자주 쓰는 패턴
repository.observeUsers()
.map { users -> users.filter { it.isActive } }
.catch { e -> emit(emptyList()) } // 에러 처리
.collect { activeUsers ->
updateUI(activeUsers)
}
주요 연산자를 정리하면 이렇습니다.
- map, filter: 컬렉션과 동일한 변환/필터링
- take(n): 처음 n개만 수집
- catch: 업스트림의 예외를 잡음 (다운스트림은 못 잡음)
- onEach: 각 값에 대해 부수 효과 수행
- flatMapConcat / flatMapMerge: Flow를 평탄화 (순차/병렬)
- combine / zip: 두 Flow를 합침
flowOn — Flow의 실행 스레드 지정
flow {
// Dispatchers.IO에서 실행됨
emit(readFile("data.json"))
}
.flowOn(Dispatchers.IO) // 업스트림의 디스패처를 변경
.collect { data ->
// collect는 호출한 코루틴의 디스패처에서 실행됨
updateUI(data)
}
핵심: flowOn은 ** 업스트림(자기보다 위쪽)**의 실행 컨텍스트만 변경합니다. collect 쪽(다운스트림)에는 영향을 주지 않습니다.
StateFlow와 SharedFlow — Hot Stream
Flow는 Cold Stream이라 수집할 때마다 처음부터 다시 실행됩니다. 하지만 UI 상태처럼 ** 항상 최신 값을 유지하고 여러 구독자에게 전달 **해야 하는 경우에는 Hot Stream이 필요합니다.
StateFlow — 상태를 위한 Flow
class CounterViewModel : ViewModel() {
// 초기값 필수
private val _count = MutableStateFlow(0)
val count: StateFlow<Int> = _count.asStateFlow()
fun increment() {
_count.value++ // 값 업데이트
}
}
// UI에서 수집
viewModel.count.collect { count ->
textView.text = "카운트: $count"
}
StateFlow의 특징을 정리하면 이렇습니다.
- 항상 ** 현재 값 **을 가지고 있음 (
.value로 접근 가능) - ** 초기값이 필수**
- ** 같은 값이 연속으로 설정되면 방출하지 않음** (distinctUntilChanged 내장)
- 새 구독자는 즉시 ** 최신 값 **을 받음
SharedFlow — 이벤트를 위한 Flow
class EventViewModel : ViewModel() {
// 초기값 없음
private val _events = MutableSharedFlow<UiEvent>()
val events: SharedFlow<UiEvent> = _events.asSharedFlow()
fun showToast(message: String) {
viewModelScope.launch {
_events.emit(UiEvent.Toast(message))
}
}
}
SharedFlow와 StateFlow의 차이를 정리합니다.
| 구분 | StateFlow | SharedFlow |
|---|---|---|
| 초기값 | 필수 | 불필요 |
| 현재 값 접근 | .value로 가능 | 불가 (replay로 대체) |
| 같은 값 방출 | 무시 (distinctUntilChanged) | 모두 방출 |
| 주 사용처 | UI 상태 관리 | 일회성 이벤트 (토스트, 네비게이션) |
// SharedFlow 설정 옵션
MutableSharedFlow<Event>(
replay = 0, // 새 구독자에게 이전 값 몇 개를 전달할지
extraBufferCapacity = 1, // 추가 버퍼 크기
onBufferOverflow = BufferOverflow.DROP_OLDEST // 버퍼 초과 시 정책
)
Channel — 코루틴 간 통신
Channel은 ** 코루틴 사이에서 데이터를 주고받는 파이프 **입니다. Flow와 달리 Hot Stream 이고, 송신자(producer)와 수신자(consumer)가 분리됩니다.
val channel = Channel<Int>()
// 송신 코루틴
launch {
for (i in 1..5) {
channel.send(i)
println("송신: $i")
}
channel.close() // 더 이상 보낼 데이터 없음
}
// 수신 코루틴
launch {
for (value in channel) { // 채널이 닫힐 때까지 수신
println("수신: $value")
}
}
Channel 버퍼 전략
// 버퍼 없음: 송수신이 만나야 전달 (기본값)
Channel<Int>(Channel.RENDEZVOUS)
// 버퍼 있음: 버퍼가 가득 차면 send가 중단됨
Channel<Int>(capacity = 10)
// 무제한: 메모리가 허용하는 만큼 버퍼링
Channel<Int>(Channel.UNLIMITED)
// 합류(Conflated): 최신 값만 유지, 이전 값은 버림
Channel<Int>(Channel.CONFLATED)
Flow vs Channel — 언제 뭘 쓰나
- Flow: 데이터 스트림을 변환하고 수집하는 용도. 대부분의 경우 Flow가 적합
- Channel: 코루틴 간 1:1 통신, 팬아웃(하나의 송신자 → 여러 수신자) 패턴에 적합
// Fan-out: 여러 워커가 하나의 채널에서 작업을 가져감
val tasks = Channel<Task>(Channel.UNLIMITED)
repeat(3) { workerId ->
launch {
for (task in tasks) {
println("워커 $workerId 처리: $task")
}
}
}
구조화된 동시성 — 코루틴의 생명주기 관리
코틀린 코루틴의 가장 중요한 설계 원칙이 ** 구조화된 동시성(Structured Concurrency)**입니다. 핵심은 이렇습니다.
- 모든 코루틴은 ** 스코프(부모)에 속해야** 한다
- 부모가 취소되면 ** 모든 자식도 취소 **된다
- 부모는 ** 모든 자식이 완료될 때까지** 기다린다
suspend fun loadDashboard() = coroutineScope {
val profile = async { fetchProfile() } // 자식 1
val feed = async { fetchFeed() } // 자식 2
val notifications = async { fetchNotifs() } // 자식 3
// coroutineScope는 세 자식이 모두 끝나야 반환됨
Dashboard(profile.await(), feed.await(), notifications.await())
}
예외 전파 — 일반 Job vs SupervisorJob
기본 동작(일반 Job)에서는 자식 하나가 실패하면 ** 부모와 형제 모두 취소 **됩니다.
// 일반 Job: 하나가 실패하면 전체 취소
coroutineScope {
launch { fetchProfile() } // 성공하더라도 취소됨
launch { throw Exception("실패!") } // 이게 실패하면
launch { fetchFeed() } // 이것도 취소됨
}
SupervisorJob 을 사용하면 자식의 실패가 다른 자식에게 전파되지 않습니다.
// SupervisorJob: 실패한 자식만 취소
supervisorScope {
launch { fetchProfile() } // 계속 실행됨
launch { throw Exception("실패!") } // 이것만 취소됨
launch { fetchFeed() } // 계속 실행됨
}
CoroutineExceptionHandler
잡히지 않은 예외를 처리하는 핸들러입니다. launch에는 동작하지만, async에는 동작하지 않습니다 (async의 예외는 await()에서 발생).
val handler = CoroutineExceptionHandler { _, exception ->
println("잡힌 예외: ${exception.message}")
}
val scope = CoroutineScope(SupervisorJob() + handler)
scope.launch {
throw RuntimeException("에러 발생!")
}
// 출력: 잡힌 예외: 에러 발생!
scope.async {
throw RuntimeException("이건 handler에서 안 잡힘")
}
// await() 호출 시 예외 발생
코루틴 예외 처리의 핵심은 try-catch, CoroutineExceptionHandler, SupervisorJob 의 삼총사입니다.
실무 패턴: ViewModel에서의 예외 처리
class MyViewModel : ViewModel() {
// SupervisorJob이 기본 포함된 viewModelScope
fun loadData() {
viewModelScope.launch {
try {
val data = repository.fetchData()
_state.value = UiState.Success(data)
} catch (e: Exception) {
_state.value = UiState.Error(e.message)
}
}
}
}
정리
코루틴 심화 개념을 핵심을 요약하면 이렇습니다.
- Flow: Cold Stream. collect() 전까지 실행 안 됨. 데이터 스트림의 기본
- StateFlow: 항상 최신 값을 유지하는 Hot Stream. UI 상태에 적합
- SharedFlow: 이벤트 전달용 Hot Stream. 같은 값도 중복 방출 가능
- Channel: 코루틴 간 1:1 통신 파이프. Fan-out 패턴에 유용
- ** 구조화된 동시성 **: 부모-자식 관계로 생명주기 관리. SupervisorJob으로 실패 격리
- ** 예외 처리 **: launch는 CoroutineExceptionHandler, async는 try-catch(await)