🌀 Kotlin 协程与 Flow 实战完全指南

📌 适合人群:有 Kotlin 基础,需要深入掌握协程和 Flow 的 Android/后端开发者
📌 阅读时长:约 25 分钟
📌 环境:Kotlin 2.0+ / kotlinx-coroutines 1.8+

文章目录


📖 前言

协程是 Kotlin 异步编程的基石,Flow 是响应式数据流的标准答案。本文聚焦实际项目中真正用得到的知识点,每个概念都配有可直接运行的代码示例。

💡 学完本文你将掌握

  • 4种协程构建器的选择策略
  • Android 生命周期感知的作用域管理
  • StateFlow 替代 LiveData 的完整方案
  • SharedFlow 事件总线模式
  • Flow 操作符链 + 异常重试
  • 5 种经过验证的项目实战模式

一、协程构建器

Kotlin 提供 4 种协程构建器,各有适用场景:

1.1 runBlocking — 桥接阻塞世界

// ⚠️ 仅用于 main 函数 / 单元测试,严禁在 Android 主线程使用!
fun main() = runBlocking {
    println("Hello")
    delay(1000)
    println("World")
}

1.2 launch — 发射后不管

// 返回 Job,可控制取消
fun testLaunch() = runBlocking {
    val job: Job = launch {
        repeat(1000) { i ->
            println("打印 $i")
            delay(500)
        }
    }
    delay(1300)
    job.cancel()  // 1.3秒后取消
    job.join()    // 等待取消完成
    println("已取消")
}

1.3 async — 需要返回值

// 返回 Deferred(继承 Job),通过 await() 获取结果
fun testAsync() = runBlocking {
    val deferred1 = async {
        delay(1000)
        "结果1"
    }
    val deferred2 = async {
        delay(800)
        "结果2"
    }
    // await() 挂起等待但不阻塞线程
    println("${deferred1.await()} + ${deferred2.await()}")  // 总耗时≈1000ms
}

1.4 coroutineScope — 结构化并发

// 任一子协程失败 → 所有子协程自动取消
suspend fun loadData() = coroutineScope {
    val user = async { fetchUser() }
    val order = async { fetchOrder() }
    "${user.await()} | ${order.await()}"
}

📊 四种构建器对比

构建器 返回值 是否阻塞 使用场景
runBlocking 任意 ✅ 阻塞线程 main / 测试
launch Job 不关心返回值
async Deferred<T> 需要返回值
coroutineScope 任意 结构化并发

二、协程作用域

⚠️ 黄金法则:绝不使用 GlobalScope!始终使用有生命周期的 Scope。

2.1 ViewModel 中的标准写法

class MyViewModel : ViewModel() {

    // viewModelScope:ViewModel 清除时自动取消所有协程
    fun loadData() {
        viewModelScope.launch(Dispatchers.IO) {
            val data = repository.fetchData()
            withContext(Dispatchers.Main) {
                _uiState.value = data  // 切回主线程更新 UI
            }
        }
    }
}

2.2 Activity / Fragment 中

class MyActivity : AppCompatActivity() {

    // lifecycleScope:生命周期感知,onDestroy 时自动取消
    fun observeData() {
        lifecycleScope.launch {
            repeatOnLifecycle(Lifecycle.State.STARTED) {
                viewModel.dataFlow.collect { data ->
                    binding.textView.text = data
                }
            }
        }
    }
}

🔥 最佳实践:始终使用 repeatOnLifecycle 而非直接在 lifecycleScopecollect,前者在 STOPPED 时自动挂起收集,节省资源。

2.3 自定义 CoroutineScope

class DataLoader {
    private val scope = CoroutineScope(
        SupervisorJob() + Dispatchers.IO + CoroutineName("DataLoader")
    )

    fun load() = scope.launch {
        // 在这个 scope 下工作
    }

    fun destroy() {
        scope.cancel()  // 一次性取消该 scope 下所有协程
    }
}

📊 作用域速查表

Scope 使用场景 生命周期
viewModelScope ViewModel ViewModel.onCleared()
lifecycleScope Activity/Fragment onDestroy
rememberCoroutineScope() Compose 组件离屏
CoroutineScope(Job()) 自定义(非 UI) 手动 cancel()
GlobalScope 🚫 永不使用 应用进程

三、SupervisorJob vs Job

// ❌ 普通 Job:一个子协程失败 → 兄弟协程全部取消
fun normalJob() = runBlocking {
    val scope = CoroutineScope(Job())
    scope.launch {
        delay(500)
        throw RuntimeException("💥 我挂了!")
    }
    scope.launch {
        delay(1000)
        println("我永远不会被打印 😢")
    }
    delay(2000)
}

// ✅ SupervisorJob:一个子协程失败 → 兄弟协程不受影响
fun supervisorJob() = runBlocking {
    val scope = CoroutineScope(SupervisorJob())
    scope.launch {
        delay(500)
        throw RuntimeException("💥 我挂了!")
    }
    scope.launch {
        delay(1000)
        println("我还能正常执行 ✅")
    }
    delay(2000)
}

// supervisorScope:暂停函数版
suspend fun loadSafely() = supervisorScope {
    val d1 = async { /* 可能失败的任务1 */ }
    val d2 = async { /* 可能失败的任务2 */ }
    // d1 失败不会取消 d2
}

四、协程异常处理

方式1:try-catch(最直接)

fun tryCatchDemo() = runBlocking {
    val job = launch {
        try {
            riskyOperation()
        } catch (e: Exception) {
            println("捕获异常: ${e.message}")
        }
    }
}

方式2:CoroutineExceptionHandler(全局捕获)

val handler = CoroutineExceptionHandler { _, exception ->
    println("全局捕获: ${exception.message}")
}

fun handlerDemo() = runBlocking {
    val scope = CoroutineScope(Job() + handler)

    scope.launch {
        throw RuntimeException("未捕获的异常")
    }

    // ⚠️ handler 对 async 的异常无效!
    // async 的异常包裹在 Deferred 中,调用 await() 时才抛出
    scope.launch {
        val deferred = async {
            throw RuntimeException("async 中的异常")
        }
        try {
            deferred.await()  // ← 异常在这里抛出
        } catch (e: Exception) {
            println("async 异常需在 await() 处捕获")
        }
    }
}

方式3:supervisorScope(隔离异常传播)

suspend fun supervisorScopeDemo() = supervisorScope {
    launch {
        throw RuntimeException("子协程异常")  // 不会影响兄弟协程
    }
    launch {
        delay(500)
        println("兄弟协程正常执行 ✅")
    }
}

⚠️ 异常处理要点

  • launch 的异常 → 立即传播,可用 CoroutineExceptionHandler 或 try-catch
  • async 的异常 → 包裹在 Deferred 中,调用 await() 时才抛出
  • CancellationException 不会被 CoroutineExceptionHandler 捕获

五、Flow 基础

🔥 Flow = 协程版的响应式流,替代 RxJava 的首选。

5.1 Flow 三大特性

特性 说明
❄️ 冷流 有收集者才发射数据
🧹 自动生命周期 收集者取消 → 流自动停止
🔙 背压支持 下游处理不过来时自动挂起

5.2 创建与收集 Flow

import kotlinx.coroutines.flow.*

// 创建
val simpleFlow: Flow<Int> = flow {
    for (i in 1..5) {
        delay(500)
        emit(i)
    }
}

// 收集
fun collectDemo() = runBlocking {
    simpleFlow.collect { value ->
        println("收到: $value")
    }
}
// 每秒输出两个数:收到 1 → 收到 2 → ... → 收到 5

5.3 构建 Flow 的 5 种方式

val flow1 = flow { emit(1); emit(2) }              // 最通用
val flow2 = flowOf(1, 2, 3)                        // 固定值
val flow3 = listOf(1, 2, 3).asFlow()               // 集合转 Flow
val flow4 = (1..5).asFlow()                         // Range 转 Flow
val flow5 = flow {                                  // 无限流
    var i = 0
    while (true) {
        emit(i++)
        delay(1000)
    }
}

六、Flow 操作符大全

val nums = (1..10).asFlow()

// ═══ 变换 ═══
nums.map { it * 10 }                    // 映射
nums.filter { it % 2 == 0 }             // 过滤
nums.transform {                        // 通用变换,可多次 emit
    emit(it)
    emit(it * 100)
}

// ═══ 截取 ═══
nums.take(3)                            // 取前 3 个后自动取消流
nums.takeWhile { it < 7 }               // 条件为 true 时继续取
nums.drop(3)                            // 跳过前 3 个

// ═══ 线程切换 ═══
flow {
    emit(loadFromDb())       // ← 在 IO 线程
}.flowOn(Dispatchers.IO)     // 指定上游线程
 .map { it.transform() }     // ← 可切换回当前线程

// ═══ 背压处理 ═══
nums.buffer(3)                          // 设置缓冲区
nums.conflate()                         // 只保留最新值
nums.collectLatest { value ->           // 新值到来时取消上次收集
    delay(300)
    println(value)
}

// ═══ 副作用 ═══
nums.onStart { println("流开始") }
    .onEach { println("发射: $it") }
    .onCompletion { e ->
        e?.let { println("异常结束: $it") }
            ?: println("正常结束")
    }
    .catch { e -> emit(-1) }
    .collect { /* ... */ }

七、StateFlow — 有状态的热流

🔥 替代 LiveData 的首选,Google 官方推荐。

7.1 StateFlow 五大特征

热流:始终存在,不依赖收集者
必须有初始值
自动 conflate:只保存最新值
值相等不通知:通过 equals 判断
线程安全update {} 保证原子性

7.2 基础使用

class CounterViewModel : ViewModel() {
    private val _count = MutableStateFlow(0)
    val count: StateFlow<Int> = _count.asStateFlow()

    fun increment() {
        _count.update { it + 1 }  // 原子操作,线程安全
    }
}

7.3 Activity 中收集

class CounterActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)

        // 方式1:普通 collect
        lifecycleScope.launch {
            viewModel.count.collect { value ->
                binding.textView.text = "计数: $value"
            }
        }

        // 方式2:repeatOnLifecycle(官方推荐!)
        lifecycleScope.launch {
            repeatOnLifecycle(Lifecycle.State.STARTED) {
                viewModel.count.collect { value ->
                    binding.textView.text = "计数: $value"
                }
            }
        }
    }
}

7.4 Compose 中收集

@Composable
fun CounterScreen(viewModel: CounterViewModel = viewModel()) {
    val count by viewModel.count.collectAsStateWithLifecycle()

    Text(text = "计数: $count")
    Button(onClick = { viewModel.increment() }) {
        Text("+1")
    }
}

7.5 实战:单数据源模式

class UserRepository(
    private val api: ApiService,
    private val db: UserDao
) {
    private val _user = MutableStateFlow<User?>(null)
    val user: StateFlow<User?> = _user.asStateFlow()

    suspend fun refreshUser(id: String) {
        _user.value = db.getUser(id)     // 1. 先从缓存展示
        try {
            val fresh = api.fetchUser(id)  // 2. 网络刷新
            db.insertUser(fresh)           // 3. 更新缓存
            _user.value = fresh            // 4. 更新 UI
        } catch (e: Exception) {
            // 失败:保持缓存数据,不覆盖
            Log.e("UserRepo", "刷新失败", e)
        }
    }
}

八、SharedFlow — 事件总线

🔥 用于一次性事件(Snackbar、导航、Toast)。

8.1 StateFlow vs SharedFlow

StateFlow SharedFlow
用途 状态持有 事件发射
初始值 必须有 无需
新收集者 立即收到最新值 不收到历史事件
值重复 不通知 每次都通知

8.2 基础使用

class EventViewModel : ViewModel() {
    private val _events = MutableSharedFlow<UiEvent>(
        replay = 0,                        // 不重放历史事件
        extraBufferCapacity = 1,           // 缓冲区
        onBufferOverflow = BufferOverflow.DROP_OLDEST
    )
    val events: SharedFlow<UiEvent> = _events.asSharedFlow()

    fun performAction() {
        viewModelScope.launch {
            _events.emit(UiEvent.ShowSnackbar("操作成功 ✅"))
        }
    }

    // tryEmit:非挂起,适合性能敏感场景
    fun quickEmit(event: UiEvent) {
        _events.tryEmit(event)
    }
}

8.3 事件定义与收集

// 事件定义
sealed class UiEvent {
    data class ShowSnackbar(val message: String) : UiEvent()
    data class NavigateTo(val route: String) : UiEvent()
    data object FinishActivity : UiEvent()
}

// Activity 中收集
class EventActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)

        lifecycleScope.launch {
            repeatOnLifecycle(Lifecycle.State.STARTED) {
                viewModel.events.collect { event ->
                    when (event) {
                        is UiEvent.ShowSnackbar ->
                            Snackbar.make(binding.root, event.message, Snackbar.LENGTH_SHORT).show()
                        is UiEvent.NavigateTo ->
                            findNavController().navigate(event.route)
                        is UiEvent.FinishActivity ->
                            finish()
                    }
                }
            }
        }
    }
}

九、Flow 异常处理与重试

9.1 catch — 拦截并恢复

fun catchDemo() = runBlocking {
    flow {
        emit(1)
        throw RuntimeException("💥 发射失败!")
        emit(2)  // 不会执行
    }
    .catch { e ->
        println("捕获: ${e.message}")
        emit(-1)  // 用默认值恢复
    }
    .collect { println(it) }
}
// 输出:1 → 捕获: 💥 发射失败!→ -1

9.2 retry — 失败后重试

fun retryDemo() = runBlocking {
    var attempt = 0
    flow {
        emit(attempt++)
        if (attempt < 3) throw RuntimeException("第${attempt}次失败")
        emit(100)
    }
    .retry(3) { e ->
        println("重试: ${e.message}")
        delay(500)
        true  // 返回 true 才重试
    }
    .collect { println("最终: $it") }
}
// 最终: 0 → 重试: 第1次 → 最终: 1 → 重试: 第2次 → 最终: 100

9.3 retryWhen — 指数退避重试

fun retryWhenDemo() = runBlocking {
    var count = 0
    flow {
        if (count++ < 2) throw IOException("网络错误")
        emit("数据")
    }
    .retryWhen { cause, attempt ->
        if (cause is IOException && attempt < 3) {
            delay((1L shl attempt.toInt()) * 100L) // 100→200→400ms
            true
        } else false
    }
    .collect { println(it) }
}

十、Flow 组合操作

10.1 combine — 任意流更新即组合

fun combineDemo() = runBlocking {
    val nums = flowOf(1, 2, 3).onEach { delay(300) }
    val strs = flowOf("A", "B", "C").onEach { delay(400) }

    nums.combine(strs) { n, s -> "$n$s" }
        .collect { println(it) }
}
// 输出:1A, 2A, 3A, 3B, 3C

10.2 zip — 严格配对

fun zipDemo() = runBlocking {
    val nums = flowOf(1, 2, 3).onEach { delay(100) }
    val strs = flowOf("A", "B", "C").onEach { delay(200) }

    nums.zip(strs) { n, s -> "$n$s" }
        .collect { println(it) }
}
// 输出:1A, 2B, 3C(快的等慢的)

10.3 merge — 合并(先到先得)

fun mergeDemo() = runBlocking {
    val f1 = flowOf(1, 2).onEach { delay(100) }
    val f2 = flowOf(3, 4).onEach { delay(150) }

    merge(f1, f2).collect { println(it) }
}
// 输出:1, 3, 2, 4

10.4 flatMapLatest — 切换流(搜索防抖核心)

fun flatMapLatestDemo() = runBlocking {
    flow {
        emit("k")
        delay(100)
        emit("ko")    // → 上一个搜索自动取消!
        delay(100)
        emit("kot")   // → 上一个搜索自动取消!
    }
    .flatMapLatest { query -> flow { emit("搜索: $query") } }
    .collect { println(it) }
}

📊 时间轴对比

combine:
nums ──1────2────3──────────
strs ──────A────B────C──────
输出 ──1A───2A───3A──3B──3C

zip:
nums ──1────2────3──────────
strs ──────A────B────C──────
输出 ──1A───2B───3C

十一、callbackFlow — 桥接回调 API

// 🔥 将传统回调转为 Flow

// 案例1:点击事件转 Flow
fun View.clicks(): Flow<Unit> = callbackFlow {
    val listener = View.OnClickListener { trySend(Unit) }
    setOnClickListener(listener)
    awaitClose { setOnClickListener(null) }  // 清理关键!
}

// 案例2:位置监听转 Flow
fun LocationManager.locationUpdates(
    minTimeMs: Long,
    minDistanceM: Float
): Flow<Location> = callbackFlow {
    val listener = LocationListener { location -> trySend(location) }
    requestLocationUpdates(LocationManager.GPS_PROVIDER, minTimeMs, minDistanceM, listener)
    awaitClose { removeUpdates(listener) }
}

// 案例3:Room 数据库直接返回 Flow
@Dao
interface UserDao {
    @Query("SELECT * FROM users WHERE id = :userId")
    fun observeUser(userId: String): Flow<User?>

    @Query("SELECT * FROM users")
    fun observeAllUsers(): Flow<List<User>>
}

十二、Channel — 协程间通信

// 基本使用
fun channelDemo() = runBlocking {
    val channel = Channel<Int>(capacity = 2)

    // 生产者
    launch {
        for (i in 1..5) {
            println("发送 $i")
            channel.send(i)  // 缓冲区满时挂起
        }
        channel.close()
    }

    delay(1000)

    // 消费者
    for (value in channel) {  // 循环直到 close
        println("收到 $value")
        delay(500)
    }
}

// produce 构建器
fun produceDemo() = runBlocking {
    val channel = produce<Int> {
        for (i in 1..5) {
            send(i)
            delay(100)
        }
    }
    channel.consumeEach { println(it) }
}

Channel 类型

类型 行为
Channel.BUFFERED 默认缓冲区(64)
Channel.RENDEZVOUS 无缓冲区,发送方等接收方
Channel.CONFLATED 只保留最新值
Channel.UNLIMITED 无限缓冲区(⚠️ 慎用)

十三、项目最佳实践(5 大模式)

模式一:网络 + 缓存 + StateFlow

class ArticleRepository(
    private val api: ArticleApi,
    private val db: ArticleDao
) {
    private val _articles = MutableStateFlow<List<Article>>(emptyList())
    val articles: StateFlow<List<Article>> = _articles.asStateFlow()

    suspend fun loadArticles(forceRefresh: Boolean = false) {
        val cached = db.getAllArticles()
        _articles.value = cached  // 1. 先从缓存展示

        if (!forceRefresh && cached.isNotEmpty() && !isStale()) return

        try {
            val fresh = api.getArticles()  // 2. 网络刷新
            db.insertAll(fresh)
            _articles.value = fresh
        } catch (e: Exception) {
            if (_articles.value.isEmpty()) throw e  // 3. 无缓存才抛异常
        }
    }
}

模式二:搜索防抖

class SearchViewModel(
    private val repository: SearchRepository
) : ViewModel() {

    private val _query = MutableStateFlow("")
    val searchResults: StateFlow<List<String>>

    init {
        searchResults = _query
            .debounce(300)                          // 300ms 防抖
            .filter { it.length >= 2 }              // 至少 2 个字符
            .distinctUntilChanged()                  // 去重
            .flatMapLatest { query ->               // 取消旧搜索
                flow { emit(repository.search(query)) }
                    .flowOn(Dispatchers.IO)
            }
            .stateIn(
                scope = viewModelScope,
                started = SharingStarted.WhileSubscribed(5000),
                initialValue = emptyList()
            )
    }

    fun onQueryChanged(query: String) {
        _query.value = query
    }
}

模式三:并行任务 + 超时 + 重试

// 并行请求
suspend fun loadDashboardData(): DashboardData = coroutineScope {
    val users = async { userRepo.getUsers() }
    val stats = async { statsRepo.getStats() }
    val config = async { configRepo.getConfig() }
    DashboardData(users.await(), stats.await(), config.await())
}

// 超时保护
suspend fun loadWithTimeout(): DashboardData =
    withTimeout(5000) { loadDashboardData() }
        ?: throw TimeoutException("加载超时")

// 重试机制
suspend fun loadWithRetry(retries: Int = 3): DashboardData {
    repeat(retries - 1) {
        try { return loadWithTimeout() }
        catch (e: TimeoutException) { delay(500) }
    }
    return loadWithTimeout()
}

模式四:轮询

fun pollUpdates(intervalMs: Long): Flow<Status> = flow {
    while (true) {
        emit(fetchStatus())
        delay(intervalMs)
    }
}.flowOn(Dispatchers.IO)

模式五:多数据源合并

class HomeViewModel(
    private val userRepo: UserRepository,
    private val articleRepo: ArticleRepository,
    private val notificationRepo: NotificationRepository
) : ViewModel() {

    val homeState: StateFlow<HomeUiState> = combine(
        userRepo.currentUser,         // Flow<User?>
        articleRepo.latestArticles,   // Flow<List<Article>>
        notificationRepo.unreadCount  // Flow<Int>
    ) { user, articles, unread ->
        HomeUiState(
            userName = user?.name ?: "未登录",
            articles = articles,
            unreadBadge = unread
        )
    }.stateIn(
        scope = viewModelScope,
        started = SharingStarted.WhileSubscribed(5000),
        initialValue = HomeUiState()
    )
}

十四、速查表

需求 方案
并发两个请求 coroutineScope { async{...}; async{...} }
不关心返回值 launch { ... }
阻塞等待结果 runBlocking { ... }(仅 main/测试)
切换线程 withContext(Dispatchers.IO) { ... }
子协程互不影响 SupervisorJob()supervisorScope
全局异常处理 CoroutineExceptionHandler
一次性事件 SharedFlow
有状态的数据 StateFlow
数据库监听 Room DAO 返回 Flow<T>
搜索防抖 debounce() + flatMapLatest()
回调转协程 callbackFlow { ... }
多个流合并 combine() / zip() / merge()
失败重试 retry() / retryWhen()
Flow → StateFlow .stateIn(scope, SharingStarted.WhileSubscribed(), initial)
Flow → SharedFlow .shareIn(scope, SharingStarted.WhileSubscribed())
Compose 收集 collectAsStateWithLifecycle()
超时控制 withTimeout() / withTimeoutOrNull()

📚 参考资源

资源 链接
🏠 协程官方指南 kotlinlang.org/docs/coroutines-guide.html
🌊 Flow 官方文档 kotlinlang.org/docs/flow.html
🤖 Android 协程最佳实践 developer.android.com/kotlin/coroutines
📖 《Kotlin 协程》 Roman Elizarov 著

💬 写在最后:协程和 Flow 是 Kotlin 异步编程的双子星。协程解决"怎么异步",Flow 解决"数据怎么流"。掌握这两者,无论是 Android 开发还是后端开发都能得心应手。

如果有帮助,欢迎 点赞 👍 | 收藏 ⭐ | 关注 🔔
有问题评论交流~


🔄 最近更新于 2025 年 6 月 | kotlinx-coroutines 1.8+ 适用

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐