协程与Flow实战完全指南_CSDN
🌀 Kotlin 协程与 Flow 实战完全指南
📌 适合人群:有 Kotlin 基础,需要深入掌握协程和 Flow 的 Android/后端开发者
📌 阅读时长:约 25 分钟
📌 环境:Kotlin 2.0+ / kotlinx-coroutines 1.8+
文章目录
📖 前言
协程是 Kotlin 异步编程的基石,Flow 是响应式数据流的标准答案。本文聚焦实际项目中真正用得到的知识点,每个概念都配有可直接运行的代码示例。
💡 学完本文你将掌握:
- 4种协程构建器的选择策略
- Android 生命周期感知的作用域管理
- StateFlow 替代 LiveData 的完整方案
- SharedFlow 事件总线模式
- Flow 操作符链 + 异常重试
- 5 种经过验证的项目实战模式
一、协程构建器
Kotlin 提供 4 种协程构建器,各有适用场景:
1.1 runBlocking — 桥接阻塞世界
// ⚠️ 仅用于 main 函数 / 单元测试,严禁在 Android 主线程使用!
fun main() = runBlocking {
println("Hello")
delay(1000)
println("World")
}
1.2 launch — 发射后不管
// 返回 Job,可控制取消
fun testLaunch() = runBlocking {
val job: Job = launch {
repeat(1000) { i ->
println("打印 $i")
delay(500)
}
}
delay(1300)
job.cancel() // 1.3秒后取消
job.join() // 等待取消完成
println("已取消")
}
1.3 async — 需要返回值
// 返回 Deferred(继承 Job),通过 await() 获取结果
fun testAsync() = runBlocking {
val deferred1 = async {
delay(1000)
"结果1"
}
val deferred2 = async {
delay(800)
"结果2"
}
// await() 挂起等待但不阻塞线程
println("${deferred1.await()} + ${deferred2.await()}") // 总耗时≈1000ms
}
1.4 coroutineScope — 结构化并发
// 任一子协程失败 → 所有子协程自动取消
suspend fun loadData() = coroutineScope {
val user = async { fetchUser() }
val order = async { fetchOrder() }
"${user.await()} | ${order.await()}"
}
📊 四种构建器对比
| 构建器 | 返回值 | 是否阻塞 | 使用场景 |
|---|---|---|---|
runBlocking |
任意 | ✅ 阻塞线程 | main / 测试 |
launch |
Job |
❌ | 不关心返回值 |
async |
Deferred<T> |
❌ | 需要返回值 |
coroutineScope |
任意 | ❌ | 结构化并发 |
二、协程作用域
⚠️ 黄金法则:绝不使用
GlobalScope!始终使用有生命周期的 Scope。
2.1 ViewModel 中的标准写法
class MyViewModel : ViewModel() {
// viewModelScope:ViewModel 清除时自动取消所有协程
fun loadData() {
viewModelScope.launch(Dispatchers.IO) {
val data = repository.fetchData()
withContext(Dispatchers.Main) {
_uiState.value = data // 切回主线程更新 UI
}
}
}
}
2.2 Activity / Fragment 中
class MyActivity : AppCompatActivity() {
// lifecycleScope:生命周期感知,onDestroy 时自动取消
fun observeData() {
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.dataFlow.collect { data ->
binding.textView.text = data
}
}
}
}
}
🔥 最佳实践:始终使用
repeatOnLifecycle而非直接在lifecycleScope中collect,前者在 STOPPED 时自动挂起收集,节省资源。
2.3 自定义 CoroutineScope
class DataLoader {
private val scope = CoroutineScope(
SupervisorJob() + Dispatchers.IO + CoroutineName("DataLoader")
)
fun load() = scope.launch {
// 在这个 scope 下工作
}
fun destroy() {
scope.cancel() // 一次性取消该 scope 下所有协程
}
}
📊 作用域速查表
| Scope | 使用场景 | 生命周期 |
|---|---|---|
viewModelScope |
ViewModel | ViewModel.onCleared() |
lifecycleScope |
Activity/Fragment | onDestroy |
rememberCoroutineScope() |
Compose | 组件离屏 |
CoroutineScope(Job()) |
自定义(非 UI) | 手动 cancel() |
GlobalScope |
🚫 永不使用 | 应用进程 |
三、SupervisorJob vs Job
// ❌ 普通 Job:一个子协程失败 → 兄弟协程全部取消
fun normalJob() = runBlocking {
val scope = CoroutineScope(Job())
scope.launch {
delay(500)
throw RuntimeException("💥 我挂了!")
}
scope.launch {
delay(1000)
println("我永远不会被打印 😢")
}
delay(2000)
}
// ✅ SupervisorJob:一个子协程失败 → 兄弟协程不受影响
fun supervisorJob() = runBlocking {
val scope = CoroutineScope(SupervisorJob())
scope.launch {
delay(500)
throw RuntimeException("💥 我挂了!")
}
scope.launch {
delay(1000)
println("我还能正常执行 ✅")
}
delay(2000)
}
// supervisorScope:暂停函数版
suspend fun loadSafely() = supervisorScope {
val d1 = async { /* 可能失败的任务1 */ }
val d2 = async { /* 可能失败的任务2 */ }
// d1 失败不会取消 d2
}
四、协程异常处理
方式1:try-catch(最直接)
fun tryCatchDemo() = runBlocking {
val job = launch {
try {
riskyOperation()
} catch (e: Exception) {
println("捕获异常: ${e.message}")
}
}
}
方式2:CoroutineExceptionHandler(全局捕获)
val handler = CoroutineExceptionHandler { _, exception ->
println("全局捕获: ${exception.message}")
}
fun handlerDemo() = runBlocking {
val scope = CoroutineScope(Job() + handler)
scope.launch {
throw RuntimeException("未捕获的异常")
}
// ⚠️ handler 对 async 的异常无效!
// async 的异常包裹在 Deferred 中,调用 await() 时才抛出
scope.launch {
val deferred = async {
throw RuntimeException("async 中的异常")
}
try {
deferred.await() // ← 异常在这里抛出
} catch (e: Exception) {
println("async 异常需在 await() 处捕获")
}
}
}
方式3:supervisorScope(隔离异常传播)
suspend fun supervisorScopeDemo() = supervisorScope {
launch {
throw RuntimeException("子协程异常") // 不会影响兄弟协程
}
launch {
delay(500)
println("兄弟协程正常执行 ✅")
}
}
⚠️ 异常处理要点:
launch的异常 → 立即传播,可用CoroutineExceptionHandler或 try-catchasync的异常 → 包裹在Deferred中,调用await()时才抛出CancellationException不会被CoroutineExceptionHandler捕获
五、Flow 基础
🔥 Flow = 协程版的响应式流,替代 RxJava 的首选。
5.1 Flow 三大特性
| 特性 | 说明 |
|---|---|
| ❄️ 冷流 | 有收集者才发射数据 |
| 🧹 自动生命周期 | 收集者取消 → 流自动停止 |
| 🔙 背压支持 | 下游处理不过来时自动挂起 |
5.2 创建与收集 Flow
import kotlinx.coroutines.flow.*
// 创建
val simpleFlow: Flow<Int> = flow {
for (i in 1..5) {
delay(500)
emit(i)
}
}
// 收集
fun collectDemo() = runBlocking {
simpleFlow.collect { value ->
println("收到: $value")
}
}
// 每秒输出两个数:收到 1 → 收到 2 → ... → 收到 5
5.3 构建 Flow 的 5 种方式
val flow1 = flow { emit(1); emit(2) } // 最通用
val flow2 = flowOf(1, 2, 3) // 固定值
val flow3 = listOf(1, 2, 3).asFlow() // 集合转 Flow
val flow4 = (1..5).asFlow() // Range 转 Flow
val flow5 = flow { // 无限流
var i = 0
while (true) {
emit(i++)
delay(1000)
}
}
六、Flow 操作符大全
val nums = (1..10).asFlow()
// ═══ 变换 ═══
nums.map { it * 10 } // 映射
nums.filter { it % 2 == 0 } // 过滤
nums.transform { // 通用变换,可多次 emit
emit(it)
emit(it * 100)
}
// ═══ 截取 ═══
nums.take(3) // 取前 3 个后自动取消流
nums.takeWhile { it < 7 } // 条件为 true 时继续取
nums.drop(3) // 跳过前 3 个
// ═══ 线程切换 ═══
flow {
emit(loadFromDb()) // ← 在 IO 线程
}.flowOn(Dispatchers.IO) // 指定上游线程
.map { it.transform() } // ← 可切换回当前线程
// ═══ 背压处理 ═══
nums.buffer(3) // 设置缓冲区
nums.conflate() // 只保留最新值
nums.collectLatest { value -> // 新值到来时取消上次收集
delay(300)
println(value)
}
// ═══ 副作用 ═══
nums.onStart { println("流开始") }
.onEach { println("发射: $it") }
.onCompletion { e ->
e?.let { println("异常结束: $it") }
?: println("正常结束")
}
.catch { e -> emit(-1) }
.collect { /* ... */ }
七、StateFlow — 有状态的热流
🔥 替代 LiveData 的首选,Google 官方推荐。
7.1 StateFlow 五大特征
① 热流:始终存在,不依赖收集者
② 必须有初始值
③ 自动 conflate:只保存最新值
④ 值相等不通知:通过 equals 判断
⑤ 线程安全:update {} 保证原子性
7.2 基础使用
class CounterViewModel : ViewModel() {
private val _count = MutableStateFlow(0)
val count: StateFlow<Int> = _count.asStateFlow()
fun increment() {
_count.update { it + 1 } // 原子操作,线程安全
}
}
7.3 Activity 中收集
class CounterActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
// 方式1:普通 collect
lifecycleScope.launch {
viewModel.count.collect { value ->
binding.textView.text = "计数: $value"
}
}
// 方式2:repeatOnLifecycle(官方推荐!)
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.count.collect { value ->
binding.textView.text = "计数: $value"
}
}
}
}
}
7.4 Compose 中收集
@Composable
fun CounterScreen(viewModel: CounterViewModel = viewModel()) {
val count by viewModel.count.collectAsStateWithLifecycle()
Text(text = "计数: $count")
Button(onClick = { viewModel.increment() }) {
Text("+1")
}
}
7.5 实战:单数据源模式
class UserRepository(
private val api: ApiService,
private val db: UserDao
) {
private val _user = MutableStateFlow<User?>(null)
val user: StateFlow<User?> = _user.asStateFlow()
suspend fun refreshUser(id: String) {
_user.value = db.getUser(id) // 1. 先从缓存展示
try {
val fresh = api.fetchUser(id) // 2. 网络刷新
db.insertUser(fresh) // 3. 更新缓存
_user.value = fresh // 4. 更新 UI
} catch (e: Exception) {
// 失败:保持缓存数据,不覆盖
Log.e("UserRepo", "刷新失败", e)
}
}
}
八、SharedFlow — 事件总线
🔥 用于一次性事件(Snackbar、导航、Toast)。
8.1 StateFlow vs SharedFlow
| StateFlow | SharedFlow | |
|---|---|---|
| 用途 | 状态持有 | 事件发射 |
| 初始值 | 必须有 | 无需 |
| 新收集者 | 立即收到最新值 | 不收到历史事件 |
| 值重复 | 不通知 | 每次都通知 |
8.2 基础使用
class EventViewModel : ViewModel() {
private val _events = MutableSharedFlow<UiEvent>(
replay = 0, // 不重放历史事件
extraBufferCapacity = 1, // 缓冲区
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
val events: SharedFlow<UiEvent> = _events.asSharedFlow()
fun performAction() {
viewModelScope.launch {
_events.emit(UiEvent.ShowSnackbar("操作成功 ✅"))
}
}
// tryEmit:非挂起,适合性能敏感场景
fun quickEmit(event: UiEvent) {
_events.tryEmit(event)
}
}
8.3 事件定义与收集
// 事件定义
sealed class UiEvent {
data class ShowSnackbar(val message: String) : UiEvent()
data class NavigateTo(val route: String) : UiEvent()
data object FinishActivity : UiEvent()
}
// Activity 中收集
class EventActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.events.collect { event ->
when (event) {
is UiEvent.ShowSnackbar ->
Snackbar.make(binding.root, event.message, Snackbar.LENGTH_SHORT).show()
is UiEvent.NavigateTo ->
findNavController().navigate(event.route)
is UiEvent.FinishActivity ->
finish()
}
}
}
}
}
}
九、Flow 异常处理与重试
9.1 catch — 拦截并恢复
fun catchDemo() = runBlocking {
flow {
emit(1)
throw RuntimeException("💥 发射失败!")
emit(2) // 不会执行
}
.catch { e ->
println("捕获: ${e.message}")
emit(-1) // 用默认值恢复
}
.collect { println(it) }
}
// 输出:1 → 捕获: 💥 发射失败!→ -1
9.2 retry — 失败后重试
fun retryDemo() = runBlocking {
var attempt = 0
flow {
emit(attempt++)
if (attempt < 3) throw RuntimeException("第${attempt}次失败")
emit(100)
}
.retry(3) { e ->
println("重试: ${e.message}")
delay(500)
true // 返回 true 才重试
}
.collect { println("最终: $it") }
}
// 最终: 0 → 重试: 第1次 → 最终: 1 → 重试: 第2次 → 最终: 100
9.3 retryWhen — 指数退避重试
fun retryWhenDemo() = runBlocking {
var count = 0
flow {
if (count++ < 2) throw IOException("网络错误")
emit("数据")
}
.retryWhen { cause, attempt ->
if (cause is IOException && attempt < 3) {
delay((1L shl attempt.toInt()) * 100L) // 100→200→400ms
true
} else false
}
.collect { println(it) }
}
十、Flow 组合操作
10.1 combine — 任意流更新即组合
fun combineDemo() = runBlocking {
val nums = flowOf(1, 2, 3).onEach { delay(300) }
val strs = flowOf("A", "B", "C").onEach { delay(400) }
nums.combine(strs) { n, s -> "$n$s" }
.collect { println(it) }
}
// 输出:1A, 2A, 3A, 3B, 3C
10.2 zip — 严格配对
fun zipDemo() = runBlocking {
val nums = flowOf(1, 2, 3).onEach { delay(100) }
val strs = flowOf("A", "B", "C").onEach { delay(200) }
nums.zip(strs) { n, s -> "$n$s" }
.collect { println(it) }
}
// 输出:1A, 2B, 3C(快的等慢的)
10.3 merge — 合并(先到先得)
fun mergeDemo() = runBlocking {
val f1 = flowOf(1, 2).onEach { delay(100) }
val f2 = flowOf(3, 4).onEach { delay(150) }
merge(f1, f2).collect { println(it) }
}
// 输出:1, 3, 2, 4
10.4 flatMapLatest — 切换流(搜索防抖核心)
fun flatMapLatestDemo() = runBlocking {
flow {
emit("k")
delay(100)
emit("ko") // → 上一个搜索自动取消!
delay(100)
emit("kot") // → 上一个搜索自动取消!
}
.flatMapLatest { query -> flow { emit("搜索: $query") } }
.collect { println(it) }
}
📊 时间轴对比
combine:
nums ──1────2────3──────────
strs ──────A────B────C──────
输出 ──1A───2A───3A──3B──3C
zip:
nums ──1────2────3──────────
strs ──────A────B────C──────
输出 ──1A───2B───3C
十一、callbackFlow — 桥接回调 API
// 🔥 将传统回调转为 Flow
// 案例1:点击事件转 Flow
fun View.clicks(): Flow<Unit> = callbackFlow {
val listener = View.OnClickListener { trySend(Unit) }
setOnClickListener(listener)
awaitClose { setOnClickListener(null) } // 清理关键!
}
// 案例2:位置监听转 Flow
fun LocationManager.locationUpdates(
minTimeMs: Long,
minDistanceM: Float
): Flow<Location> = callbackFlow {
val listener = LocationListener { location -> trySend(location) }
requestLocationUpdates(LocationManager.GPS_PROVIDER, minTimeMs, minDistanceM, listener)
awaitClose { removeUpdates(listener) }
}
// 案例3:Room 数据库直接返回 Flow
@Dao
interface UserDao {
@Query("SELECT * FROM users WHERE id = :userId")
fun observeUser(userId: String): Flow<User?>
@Query("SELECT * FROM users")
fun observeAllUsers(): Flow<List<User>>
}
十二、Channel — 协程间通信
// 基本使用
fun channelDemo() = runBlocking {
val channel = Channel<Int>(capacity = 2)
// 生产者
launch {
for (i in 1..5) {
println("发送 $i")
channel.send(i) // 缓冲区满时挂起
}
channel.close()
}
delay(1000)
// 消费者
for (value in channel) { // 循环直到 close
println("收到 $value")
delay(500)
}
}
// produce 构建器
fun produceDemo() = runBlocking {
val channel = produce<Int> {
for (i in 1..5) {
send(i)
delay(100)
}
}
channel.consumeEach { println(it) }
}
Channel 类型
| 类型 | 行为 |
|---|---|
Channel.BUFFERED |
默认缓冲区(64) |
Channel.RENDEZVOUS |
无缓冲区,发送方等接收方 |
Channel.CONFLATED |
只保留最新值 |
Channel.UNLIMITED |
无限缓冲区(⚠️ 慎用) |
十三、项目最佳实践(5 大模式)
模式一:网络 + 缓存 + StateFlow
class ArticleRepository(
private val api: ArticleApi,
private val db: ArticleDao
) {
private val _articles = MutableStateFlow<List<Article>>(emptyList())
val articles: StateFlow<List<Article>> = _articles.asStateFlow()
suspend fun loadArticles(forceRefresh: Boolean = false) {
val cached = db.getAllArticles()
_articles.value = cached // 1. 先从缓存展示
if (!forceRefresh && cached.isNotEmpty() && !isStale()) return
try {
val fresh = api.getArticles() // 2. 网络刷新
db.insertAll(fresh)
_articles.value = fresh
} catch (e: Exception) {
if (_articles.value.isEmpty()) throw e // 3. 无缓存才抛异常
}
}
}
模式二:搜索防抖
class SearchViewModel(
private val repository: SearchRepository
) : ViewModel() {
private val _query = MutableStateFlow("")
val searchResults: StateFlow<List<String>>
init {
searchResults = _query
.debounce(300) // 300ms 防抖
.filter { it.length >= 2 } // 至少 2 个字符
.distinctUntilChanged() // 去重
.flatMapLatest { query -> // 取消旧搜索
flow { emit(repository.search(query)) }
.flowOn(Dispatchers.IO)
}
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
initialValue = emptyList()
)
}
fun onQueryChanged(query: String) {
_query.value = query
}
}
模式三:并行任务 + 超时 + 重试
// 并行请求
suspend fun loadDashboardData(): DashboardData = coroutineScope {
val users = async { userRepo.getUsers() }
val stats = async { statsRepo.getStats() }
val config = async { configRepo.getConfig() }
DashboardData(users.await(), stats.await(), config.await())
}
// 超时保护
suspend fun loadWithTimeout(): DashboardData =
withTimeout(5000) { loadDashboardData() }
?: throw TimeoutException("加载超时")
// 重试机制
suspend fun loadWithRetry(retries: Int = 3): DashboardData {
repeat(retries - 1) {
try { return loadWithTimeout() }
catch (e: TimeoutException) { delay(500) }
}
return loadWithTimeout()
}
模式四:轮询
fun pollUpdates(intervalMs: Long): Flow<Status> = flow {
while (true) {
emit(fetchStatus())
delay(intervalMs)
}
}.flowOn(Dispatchers.IO)
模式五:多数据源合并
class HomeViewModel(
private val userRepo: UserRepository,
private val articleRepo: ArticleRepository,
private val notificationRepo: NotificationRepository
) : ViewModel() {
val homeState: StateFlow<HomeUiState> = combine(
userRepo.currentUser, // Flow<User?>
articleRepo.latestArticles, // Flow<List<Article>>
notificationRepo.unreadCount // Flow<Int>
) { user, articles, unread ->
HomeUiState(
userName = user?.name ?: "未登录",
articles = articles,
unreadBadge = unread
)
}.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
initialValue = HomeUiState()
)
}
十四、速查表
| 需求 | 方案 |
|---|---|
| 并发两个请求 | coroutineScope { async{...}; async{...} } |
| 不关心返回值 | launch { ... } |
| 阻塞等待结果 | runBlocking { ... }(仅 main/测试) |
| 切换线程 | withContext(Dispatchers.IO) { ... } |
| 子协程互不影响 | SupervisorJob() 或 supervisorScope |
| 全局异常处理 | CoroutineExceptionHandler |
| 一次性事件 | SharedFlow |
| 有状态的数据 | StateFlow |
| 数据库监听 | Room DAO 返回 Flow<T> |
| 搜索防抖 | debounce() + flatMapLatest() |
| 回调转协程 | callbackFlow { ... } |
| 多个流合并 | combine() / zip() / merge() |
| 失败重试 | retry() / retryWhen() |
| Flow → StateFlow | .stateIn(scope, SharingStarted.WhileSubscribed(), initial) |
| Flow → SharedFlow | .shareIn(scope, SharingStarted.WhileSubscribed()) |
| Compose 收集 | collectAsStateWithLifecycle() |
| 超时控制 | withTimeout() / withTimeoutOrNull() |
📚 参考资源
| 资源 | 链接 |
|---|---|
| 🏠 协程官方指南 | kotlinlang.org/docs/coroutines-guide.html |
| 🌊 Flow 官方文档 | kotlinlang.org/docs/flow.html |
| 🤖 Android 协程最佳实践 | developer.android.com/kotlin/coroutines |
| 📖 《Kotlin 协程》 | Roman Elizarov 著 |
💬 写在最后:协程和 Flow 是 Kotlin 异步编程的双子星。协程解决"怎么异步",Flow 解决"数据怎么流"。掌握这两者,无论是 Android 开发还是后端开发都能得心应手。
如果有帮助,欢迎 点赞 👍 | 收藏 ⭐ | 关注 🔔
有问题评论交流~
🔄 最近更新于 2025 年 6 月 | kotlinx-coroutines 1.8+ 适用
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)