코루틴 기초를 넘어서면 Flow, Channel, 구조화된 동시성 같은 개념이 등장합니다. 특히 Flow는 안드로이드에서 LiveData를 대체하는 추세라 이해해야 할 시점입니다.

Flow — Cold Stream의 의미

Flow는 비동기적으로 여러 값을 순차적으로 방출하는 Cold Stream 입니다.

"Cold"라는 건 collect()를 호출하기 전까지 아무것도 실행하지 않는다 는 뜻입니다. Sequence의 비동기 버전이라고 생각하면 이해가 빠릅니다.

KOTLIN
// Flow 생성 — 이 시점에서는 아무것도 실행되지 않음
fun numbersFlow(): Flow<Int> = flow {
    println("Flow 시작")
    for (i in 1..3) {
        delay(100)
        emit(i)  // 값을 방출
    }
}

// collect()를 호출해야 비로소 실행됨
suspend fun main() {
    val flow = numbersFlow()
    println("collect 호출 전")

    flow.collect { value ->
        println("수신: $value")
    }
}
// 출력:
// collect 호출 전
// Flow 시작
// 수신: 1
// 수신: 2
// 수신: 3

Flow 연산자

Flow는 컬렉션처럼 다양한 중간 연산자를 제공합니다. 모두 지연 평가(lazy) 입니다.

KOTLIN
flowOf(1, 2, 3, 4, 5)
    .filter { it % 2 == 0 }
    .map { it * 10 }
    .collect { println(it) }  // 20, 40

// 실무에서 자주 쓰는 패턴
repository.observeUsers()
    .map { users -> users.filter { it.isActive } }
    .catch { e -> emit(emptyList()) }  // 에러 처리
    .collect { activeUsers ->
        updateUI(activeUsers)
    }

주요 연산자를 정리하면 이렇습니다.

  • map, filter: 컬렉션과 동일한 변환/필터링
  • take(n): 처음 n개만 수집
  • catch: 업스트림의 예외를 잡음 (다운스트림은 못 잡음)
  • onEach: 각 값에 대해 부수 효과 수행
  • flatMapConcat / flatMapMerge: Flow를 평탄화 (순차/병렬)
  • combine / zip: 두 Flow를 합침

flowOn — Flow의 실행 스레드 지정

KOTLIN
flow {
    // Dispatchers.IO에서 실행됨
    emit(readFile("data.json"))
}
.flowOn(Dispatchers.IO)  // 업스트림의 디스패처를 변경
.collect { data ->
    // collect는 호출한 코루틴의 디스패처에서 실행됨
    updateUI(data)
}

핵심: flowOn은 ** 업스트림(자기보다 위쪽)**의 실행 컨텍스트만 변경합니다. collect 쪽(다운스트림)에는 영향을 주지 않습니다.

StateFlow와 SharedFlow — Hot Stream

Flow는 Cold Stream이라 수집할 때마다 처음부터 다시 실행됩니다. 하지만 UI 상태처럼 ** 항상 최신 값을 유지하고 여러 구독자에게 전달 **해야 하는 경우에는 Hot Stream이 필요합니다.

StateFlow — 상태를 위한 Flow

KOTLIN
class CounterViewModel : ViewModel() {
    // 초기값 필수
    private val _count = MutableStateFlow(0)
    val count: StateFlow<Int> = _count.asStateFlow()

    fun increment() {
        _count.value++  // 값 업데이트
    }
}

// UI에서 수집
viewModel.count.collect { count ->
    textView.text = "카운트: $count"
}

StateFlow의 특징을 정리하면 이렇습니다.

  • 항상 ** 현재 값 **을 가지고 있음 (.value로 접근 가능)
  • ** 초기값이 필수**
  • ** 같은 값이 연속으로 설정되면 방출하지 않음** (distinctUntilChanged 내장)
  • 새 구독자는 즉시 ** 최신 값 **을 받음

SharedFlow — 이벤트를 위한 Flow

KOTLIN
class EventViewModel : ViewModel() {
    // 초기값 없음
    private val _events = MutableSharedFlow<UiEvent>()
    val events: SharedFlow<UiEvent> = _events.asSharedFlow()

    fun showToast(message: String) {
        viewModelScope.launch {
            _events.emit(UiEvent.Toast(message))
        }
    }
}

SharedFlow와 StateFlow의 차이를 정리합니다.

구분StateFlowSharedFlow
초기값필수불필요
현재 값 접근.value로 가능불가 (replay로 대체)
같은 값 방출무시 (distinctUntilChanged)모두 방출
주 사용처UI 상태 관리일회성 이벤트 (토스트, 네비게이션)
KOTLIN
// SharedFlow 설정 옵션
MutableSharedFlow<Event>(
    replay = 0,           // 새 구독자에게 이전 값 몇 개를 전달할지
    extraBufferCapacity = 1,  // 추가 버퍼 크기
    onBufferOverflow = BufferOverflow.DROP_OLDEST  // 버퍼 초과 시 정책
)

Channel — 코루틴 간 통신

Channel은 ** 코루틴 사이에서 데이터를 주고받는 파이프 **입니다. Flow와 달리 Hot Stream 이고, 송신자(producer)와 수신자(consumer)가 분리됩니다.

KOTLIN
val channel = Channel<Int>()

// 송신 코루틴
launch {
    for (i in 1..5) {
        channel.send(i)
        println("송신: $i")
    }
    channel.close()  // 더 이상 보낼 데이터 없음
}

// 수신 코루틴
launch {
    for (value in channel) {  // 채널이 닫힐 때까지 수신
        println("수신: $value")
    }
}

Channel 버퍼 전략

KOTLIN
// 버퍼 없음: 송수신이 만나야 전달 (기본값)
Channel<Int>(Channel.RENDEZVOUS)

// 버퍼 있음: 버퍼가 가득 차면 send가 중단됨
Channel<Int>(capacity = 10)

// 무제한: 메모리가 허용하는 만큼 버퍼링
Channel<Int>(Channel.UNLIMITED)

// 합류(Conflated): 최신 값만 유지, 이전 값은 버림
Channel<Int>(Channel.CONFLATED)

Flow vs Channel — 언제 뭘 쓰나

  • Flow: 데이터 스트림을 변환하고 수집하는 용도. 대부분의 경우 Flow가 적합
  • Channel: 코루틴 간 1:1 통신, 팬아웃(하나의 송신자 → 여러 수신자) 패턴에 적합
KOTLIN
// Fan-out: 여러 워커가 하나의 채널에서 작업을 가져감
val tasks = Channel<Task>(Channel.UNLIMITED)

repeat(3) { workerId ->
    launch {
        for (task in tasks) {
            println("워커 $workerId 처리: $task")
        }
    }
}

구조화된 동시성 — 코루틴의 생명주기 관리

코틀린 코루틴의 가장 중요한 설계 원칙이 ** 구조화된 동시성(Structured Concurrency)**입니다. 핵심은 이렇습니다.

  • 모든 코루틴은 ** 스코프(부모)에 속해야** 한다
  • 부모가 취소되면 ** 모든 자식도 취소 **된다
  • 부모는 ** 모든 자식이 완료될 때까지** 기다린다
KOTLIN
suspend fun loadDashboard() = coroutineScope {
    val profile = async { fetchProfile() }    // 자식 1
    val feed = async { fetchFeed() }          // 자식 2
    val notifications = async { fetchNotifs() } // 자식 3

    // coroutineScope는 세 자식이 모두 끝나야 반환됨
    Dashboard(profile.await(), feed.await(), notifications.await())
}

예외 전파 — 일반 Job vs SupervisorJob

기본 동작(일반 Job)에서는 자식 하나가 실패하면 ** 부모와 형제 모두 취소 **됩니다.

KOTLIN
// 일반 Job: 하나가 실패하면 전체 취소
coroutineScope {
    launch { fetchProfile() }   // 성공하더라도 취소됨
    launch { throw Exception("실패!") }  // 이게 실패하면
    launch { fetchFeed() }      // 이것도 취소됨
}

SupervisorJob 을 사용하면 자식의 실패가 다른 자식에게 전파되지 않습니다.

KOTLIN
// SupervisorJob: 실패한 자식만 취소
supervisorScope {
    launch { fetchProfile() }   // 계속 실행됨
    launch { throw Exception("실패!") }  // 이것만 취소됨
    launch { fetchFeed() }      // 계속 실행됨
}

CoroutineExceptionHandler

잡히지 않은 예외를 처리하는 핸들러입니다. launch에는 동작하지만, async에는 동작하지 않습니다 (async의 예외는 await()에서 발생).

KOTLIN
val handler = CoroutineExceptionHandler { _, exception ->
    println("잡힌 예외: ${exception.message}")
}

val scope = CoroutineScope(SupervisorJob() + handler)

scope.launch {
    throw RuntimeException("에러 발생!")
}
// 출력: 잡힌 예외: 에러 발생!

scope.async {
    throw RuntimeException("이건 handler에서 안 잡힘")
}
// await() 호출 시 예외 발생

코루틴 예외 처리의 핵심은 try-catch, CoroutineExceptionHandler, SupervisorJob 의 삼총사입니다.

실무 패턴: ViewModel에서의 예외 처리

KOTLIN
class MyViewModel : ViewModel() {
    // SupervisorJob이 기본 포함된 viewModelScope
    fun loadData() {
        viewModelScope.launch {
            try {
                val data = repository.fetchData()
                _state.value = UiState.Success(data)
            } catch (e: Exception) {
                _state.value = UiState.Error(e.message)
            }
        }
    }
}

정리

코루틴 심화 개념을 핵심을 요약하면 이렇습니다.

  • Flow: Cold Stream. collect() 전까지 실행 안 됨. 데이터 스트림의 기본
  • StateFlow: 항상 최신 값을 유지하는 Hot Stream. UI 상태에 적합
  • SharedFlow: 이벤트 전달용 Hot Stream. 같은 값도 중복 방출 가능
  • Channel: 코루틴 간 1:1 통신 파이프. Fan-out 패턴에 유용
  • ** 구조화된 동시성 **: 부모-자식 관계로 생명주기 관리. SupervisorJob으로 실패 격리
  • ** 예외 처리 **: launch는 CoroutineExceptionHandler, async는 try-catch(await)
댓글 로딩 중...