01-01-04 协程调度器与线程完全指南
·
01-01-04 协程调度器与线程完全指南
文档概述
本文档深入讲解Kotlin协程的调度器机制与线程管理,包括调度器类型、线程切换、性能优化和deviceSecurity项目实战案例。
目录
- 1. 协程调度器基础
- 2. 四大标准调度器
- 3. 线程切换与withContext
- 4. 调度器的选择策略
- 5. 自定义调度器
- 6. 性能优化与最佳实践
- 7. deviceSecurity实战案例
- 8. 常见问题FAQ
1. 协程调度器基础
1.1 什么是协程调度器
协程调度器(CoroutineDispatcher)决定协程在哪个线程或线程池上执行。它是协程上下文(CoroutineContext)的一部分。
/**
* 调度器基本概念
*/
class DispatcherBasics {
// 1. 指定调度器启动协程
fun example1() {
CoroutineScope(Dispatchers.Main).launch {
println("运行在主线程: ${Thread.currentThread().name}")
}
}
// 2. 通过launch参数指定调度器
fun example2() {
CoroutineScope(Dispatchers.IO).launch(Dispatchers.Default) {
println("运行在Default线程池: ${Thread.currentThread().name}")
}
}
// 3. 查看当前调度器
suspend fun checkDispatcher() {
val dispatcher = coroutineContext[CoroutineDispatcher]
println("当前调度器: $dispatcher")
}
}
1.2 调度器的继承规则
/**
* 调度器继承规则
*/
class DispatcherInheritance {
fun demonstrateInheritance() {
// 父协程的调度器
CoroutineScope(Dispatchers.Main).launch {
println("父协程: ${Thread.currentThread().name}")
// 子协程继承父协程的调度器
launch {
println("子协程1(继承): ${Thread.currentThread().name}")
}
// 子协程可以指定不同的调度器
launch(Dispatchers.IO) {
println("子协程2(IO): ${Thread.currentThread().name}")
}
}
}
}
2. 四大标准调度器
2.1 Dispatchers.Main - UI线程调度器
用途:在Android主线程执行,用于更新UI
/**
* Dispatchers.Main 使用示例
*/
class MainDispatcherExample {
// ✅ 推荐: 在Main调度器更新UI
fun updateUI(viewModel: ViewModel) {
viewModel.viewModelScope.launch(Dispatchers.Main) {
val data = loadData() // 可能在IO线程
textView.text = data // 必须在主线程
}
}
// Dispatchers.Main.immediate - 立即执行
fun immediateExecution() {
CoroutineScope(Dispatchers.Main).launch {
// 如果已经在主线程,使用immediate避免不必要的调度
withContext(Dispatchers.Main.immediate) {
updateUIElement()
}
}
}
private suspend fun loadData(): String {
return withContext(Dispatchers.IO) {
// 网络或数据库操作
"Data loaded"
}
}
private fun updateUIElement() {
// UI更新操作
}
}
特点:
- 单线程执行
- 避免阻塞主线程,否则会导致ANR
Main.immediate:如果已在主线程则立即执行,避免重新调度
2.2 Dispatchers.IO - IO操作调度器
用途:文件读写、网络请求、数据库操作
/**
* Dispatchers.IO 使用示例
*/
class IODispatcherExample {
// 网络请求
suspend fun fetchDataFromNetwork(): String {
return withContext(Dispatchers.IO) {
val response = httpClient.get("https://api.example.com/data")
response.body()
}
}
// 文件读写
suspend fun readFromFile(file: File): String {
return withContext(Dispatchers.IO) {
file.readText()
}
}
// 数据库查询
suspend fun queryDatabase(dao: UserDao): List<User> {
return withContext(Dispatchers.IO) {
dao.getAllUsers()
}
}
// 多个IO操作并发执行
suspend fun parallelIOOperations() {
withContext(Dispatchers.IO) {
val result1 = async { fetchDataFromNetwork() }
val result2 = async { readFromFile(File("data.txt")) }
val result3 = async { queryDatabase(userDao) }
// 等待所有操作完成
val data = result1.await()
val fileContent = result2.await()
val users = result3.await()
}
}
}
特点:
- 共享线程池,默认最大64个线程(或CPU核心数,取较大值)
- 线程可复用,适合IO密集型任务
- 阻塞操作不会影响其他协程
2.3 Dispatchers.Default - CPU密集型调度器
用途:计算密集型任务、数据处理、算法运算
/**
* Dispatchers.Default 使用示例
*/
class DefaultDispatcherExample {
// CPU密集型计算
suspend fun calculateHash(data: ByteArray): String {
return withContext(Dispatchers.Default) {
MessageDigest.getInstance("SHA-256")
.digest(data)
.joinToString("") { "%02x".format(it) }
}
}
// 大数据排序
suspend fun sortLargeList(list: List<Int>): List<Int> {
return withContext(Dispatchers.Default) {
list.sorted()
}
}
// 图像处理
suspend fun processImage(bitmap: Bitmap): Bitmap {
return withContext(Dispatchers.Default) {
// CPU密集的图像处理算法
applyFilter(bitmap)
}
}
// 并行计算
suspend fun parallelComputation(data: List<Int>): List<Int> {
return withContext(Dispatchers.Default) {
data.chunked(data.size / 4) // 分成4块
.map { chunk ->
async { processChunk(chunk) }
}
.flatMap { it.await() }
}
}
private fun applyFilter(bitmap: Bitmap): Bitmap {
// 图像处理逻辑
return bitmap
}
private fun processChunk(chunk: List<Int>): List<Int> {
// 处理数据块
return chunk.map { it * 2 }
}
}
特点:
- 线程池大小等于CPU核心数
- 适合不阻塞的计算密集型任务
- 与IO调度器共享线程池资源
2.4 Dispatchers.Unconfined - 非受限调度器
用途:测试和特殊场景,生产环境慎用
/**
* Dispatchers.Unconfined 使用示例
*/
class UnconfinedDispatcherExample {
fun demonstrateUnconfined() {
runBlocking {
println("主线程: ${Thread.currentThread().name}")
launch(Dispatchers.Unconfined) {
println("启动时: ${Thread.currentThread().name}") // 主线程
delay(100)
println("恢复后: ${Thread.currentThread().name}") // kotlinx.coroutines.DefaultExecutor
}
}
}
// ⚠️ 不推荐在生产环境使用
// 行为不可预测,难以调试
fun badExample() {
GlobalScope.launch(Dispatchers.Unconfined) {
// 第一次挂起前在调用者线程
println("Before suspend: ${Thread.currentThread().name}")
withContext(Dispatchers.IO) {
// IO操作
}
// 恢复后在IO线程池的某个线程
println("After suspend: ${Thread.currentThread().name}")
}
}
}
特点:
- 启动时在调用者线程执行
- 挂起后恢复时在恢复协程的线程执行
- 行为不确定,难以追踪和调试
- 不推荐在生产代码中使用
3. 线程切换与withContext
3.1 withContext基础
withContext 是切换调度器的标准方式,它会:
- 切换到指定调度器
- 执行代码块
- 返回结果
- 自动切换回原调度器
/**
* withContext 基础用法
*/
class WithContextBasics {
// 典型的MVVM模式
class UserViewModel : ViewModel() {
private val _users = MutableStateFlow<List<User>>(emptyList())
val users: StateFlow<List<User>> = _users.asStateFlow()
fun loadUsers() {
viewModelScope.launch {
// 1. 在主线程启动
_users.value = emptyList()
try {
// 2. 切换到IO线程进行网络请求
val result = withContext(Dispatchers.IO) {
apiService.getUsers()
}
// 3. 自动切回主线程更新UI
_users.value = result
} catch (e: Exception) {
_users.value = emptyList()
}
}
}
}
}
3.2 多次线程切换
/**
* 多次线程切换示例
*/
class MultipleContextSwitch {
suspend fun complexOperation(): Result {
// 1. IO线程:网络请求
val rawData = withContext(Dispatchers.IO) {
apiService.fetchData()
}
// 2. Default线程:数据处理(CPU密集)
val processedData = withContext(Dispatchers.Default) {
processLargeData(rawData)
}
// 3. IO线程:保存到数据库
withContext(Dispatchers.IO) {
database.save(processedData)
}
// 4. 返回到原调度器
return Result.Success(processedData)
}
private fun processLargeData(data: String): ProcessedData {
// CPU密集处理
return ProcessedData()
}
}
3.3 withContext vs async/await
/**
* withContext 与 async/await 对比
*/
class ContextSwitchComparison {
// ✅ 推荐: 顺序执行使用withContext
suspend fun sequentialOperations(): String {
val data1 = withContext(Dispatchers.IO) { fetchData1() }
val data2 = withContext(Dispatchers.IO) { fetchData2(data1) }
return "$data1 + $data2"
}
// ✅ 推荐: 并发执行使用async/await
suspend fun concurrentOperations(): String {
coroutineScope {
val deferred1 = async(Dispatchers.IO) { fetchData1() }
val deferred2 = async(Dispatchers.IO) { fetchData2("") }
val data1 = deferred1.await()
val data2 = deferred2.await()
return@coroutineScope "$data1 + $data2"
}
}
// ❌ 不推荐: 顺序操作使用async徒增复杂度
suspend fun unnecessaryAsync(): String {
coroutineScope {
val deferred1 = async(Dispatchers.IO) { fetchData1() }
val data1 = deferred1.await() // 立即await,失去并发优势
val deferred2 = async(Dispatchers.IO) { fetchData2(data1) }
val data2 = deferred2.await()
return@coroutineScope "$data1 + $data2"
}
}
private suspend fun fetchData1(): String = "Data1"
private suspend fun fetchData2(param: String): String = "Data2"
}
4. 调度器的选择策略
4.1 决策树
/**
* 调度器选择决策树
*/
class DispatcherDecisionTree {
// 场景1: 更新UI
// 答案: Dispatchers.Main
fun updateUI() {
viewModelScope.launch(Dispatchers.Main) {
textView.text = "Updated"
}
}
// 场景2: 网络请求
// 答案: Dispatchers.IO
suspend fun networkRequest() = withContext(Dispatchers.IO) {
api.getData()
}
// 场景3: 数据库操作
// 答案: Dispatchers.IO
suspend fun databaseOperation() = withContext(Dispatchers.IO) {
dao.query()
}
// 场景4: 文件读写
// 答案: Dispatchers.IO
suspend fun fileOperation() = withContext(Dispatchers.IO) {
file.readText()
}
// 场景5: JSON解析(大文件)
// 答案: Dispatchers.Default
suspend fun parseJson(json: String) = withContext(Dispatchers.Default) {
Json.decodeFromString<Data>(json)
}
// 场景6: 图像处理
// 答案: Dispatchers.Default
suspend fun processImage(bitmap: Bitmap) = withContext(Dispatchers.Default) {
applyFilters(bitmap)
}
// 场景7: 排序大列表
// 答案: Dispatchers.Default
suspend fun sortList(list: List<Int>) = withContext(Dispatchers.Default) {
list.sorted()
}
// 场景8: 加密/解密
// 答案: Dispatchers.Default
suspend fun encrypt(data: ByteArray) = withContext(Dispatchers.Default) {
cipher.doFinal(data)
}
}
4.2 性能权衡
/**
* 调度器性能权衡
*/
class DispatcherPerformance {
// 错误示例1: 在IO线程做CPU密集计算
// ❌ 会占用IO线程池,影响其他IO操作
suspend fun badExample1() {
withContext(Dispatchers.IO) {
// 大量CPU计算
(1..1000000).map { it * it }.sum()
}
}
// 正确示例1: CPU密集任务使用Default
// ✅ 使用专门的CPU线程池
suspend fun goodExample1() {
withContext(Dispatchers.Default) {
(1..1000000).map { it * it }.sum()
}
}
// 错误示例2: 在Default线程做IO操作
// ❌ 会阻塞CPU线程池
suspend fun badExample2() {
withContext(Dispatchers.Default) {
Thread.sleep(1000) // 阻塞线程
}
}
// 正确示例2: IO操作使用IO调度器
// ✅ IO线程池专门处理阻塞操作
suspend fun goodExample2() {
withContext(Dispatchers.IO) {
Thread.sleep(1000)
}
}
// 错误示例3: 频繁切换调度器
// ❌ 过度切换带来额外开销
suspend fun badExample3() {
repeat(100) {
withContext(Dispatchers.IO) {
// 简单操作
println(it)
}
}
}
// 正确示例3: 批量操作减少切换
// ✅ 一次切换处理批量任务
suspend fun goodExample3() {
withContext(Dispatchers.IO) {
repeat(100) {
println(it)
}
}
}
}
5. 自定义调度器
5.1 创建固定线程池调度器
/**
* 自定义线程池调度器
*/
class CustomDispatcher {
// 创建固定大小的线程池
private val customDispatcher = Executors.newFixedThreadPool(4)
.asCoroutineDispatcher()
// 创建单线程调度器
private val singleThreadDispatcher = Executors.newSingleThreadExecutor()
.asCoroutineDispatcher()
suspend fun useCustomDispatcher() {
withContext(customDispatcher) {
// 在自定义线程池执行
println("Custom thread: ${Thread.currentThread().name}")
}
}
// ⚠️ 重要: 记得关闭调度器释放资源
fun cleanup() {
customDispatcher.close()
singleThreadDispatcher.close()
}
}
5.2 limitedParallelism - 限制并发数
/**
* 限制并发数的调度器
*/
class LimitedParallelismExample {
// 限制IO操作并发数为2
private val limitedIO = Dispatchers.IO.limitedParallelism(2)
suspend fun downloadFiles(urls: List<String>) {
coroutineScope {
urls.forEach { url ->
launch(limitedIO) {
// 最多2个下载并发执行
downloadFile(url)
}
}
}
}
// 用例: 避免同时打开太多数据库连接
private val dbDispatcher = Dispatchers.IO.limitedParallelism(1)
suspend fun databaseOperations() {
withContext(dbDispatcher) {
// 确保数据库操作串行执行
database.transaction {
// 事务操作
}
}
}
private suspend fun downloadFile(url: String) {
// 下载逻辑
}
}
5.3 虚拟线程调度器(Java 19+)
/**
* 虚拟线程调度器
*/
@RequiresApi(Build.VERSION_CODES.TIRAMISU)
class VirtualThreadDispatcher {
// 使用Java虚拟线程(协程的另一种实现)
private val virtualDispatcher = Executors.newVirtualThreadPerTaskExecutor()
.asCoroutineDispatcher()
suspend fun useVirtualThreads() {
withContext(virtualDispatcher) {
// 在虚拟线程执行
// 适合大量IO密集任务
}
}
}
6. 性能优化与最佳实践
6.1 避免过度切换
/**
* 减少调度器切换
*/
class ReduceContextSwitch {
// ❌ 过度切换
suspend fun inefficient() {
withContext(Dispatchers.IO) { operation1() }
withContext(Dispatchers.IO) { operation2() }
withContext(Dispatchers.IO) { operation3() }
}
// ✅ 批量处理
suspend fun efficient() {
withContext(Dispatchers.IO) {
operation1()
operation2()
operation3()
}
}
private suspend fun operation1() {}
private suspend fun operation2() {}
private suspend fun operation3() {}
}
6.2 使用Main.immediate优化
/**
* Main.immediate 优化
*/
class MainImmediateOptimization {
// ❌ 不必要的调度
fun slowUpdate() {
viewModelScope.launch {
// 已经在Main线程
withContext(Dispatchers.Main) {
textView.text = "Update" // 会重新调度
}
}
}
// ✅ 使用immediate避免重复调度
fun fastUpdate() {
viewModelScope.launch {
withContext(Dispatchers.Main.immediate) {
textView.text = "Update" // 直接执行,不调度
}
}
}
}
6.3 协程调度器监控
/**
* 调度器性能监控
*/
class DispatcherMonitoring {
// 自定义拦截器监控调度器切换
class DispatcherInterceptor : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<DispatcherInterceptor>
override val key: CoroutineContext.Key<*> get() = Key
fun onDispatch(dispatcher: CoroutineDispatcher) {
Timber.d("Dispatching to: $dispatcher on thread: ${Thread.currentThread().name}")
}
}
fun monitorDispatchers() {
val interceptor = DispatcherInterceptor()
CoroutineScope(Dispatchers.Main + interceptor).launch {
interceptor.onDispatch(Dispatchers.Main)
withContext(Dispatchers.IO) {
interceptor.onDispatch(Dispatchers.IO)
}
}
}
}
6.4 线程池调优
/**
* 线程池调优
*/
class ThreadPoolTuning {
// 根据任务特征创建优化的线程池
object OptimizedDispatchers {
// 短任务高并发(如API调用)
val highConcurrency = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() * 4
).asCoroutineDispatcher()
// 长任务低并发(如文件上传)
val longRunning = Executors.newFixedThreadPool(2).asCoroutineDispatcher()
// CPU密集型
val cpuIntensive = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
).asCoroutineDispatcher()
}
suspend fun optimizedTaskExecution() {
// 根据任务类型选择合适的调度器
withContext(OptimizedDispatchers.highConcurrency) {
// 短任务
}
withContext(OptimizedDispatchers.longRunning) {
// 长任务
}
}
}
7. deviceSecurity实战案例
7.1 案例1: 视频流处理调度
/**
* deviceSecurity: 视频流处理的调度器策略
*/
class VideoStreamDispatcherStrategy @Inject constructor(
private val p2pRepository: P2PRepository,
private val videoDecoder: VideoDecoder
) {
// 自定义调度器
private val videoDecodeDispatcher = Dispatchers.Default.limitedParallelism(2)
private val networkDispatcher = Dispatchers.IO.limitedParallelism(4)
fun startVideoStream(deviceId: String): Flow<Bitmap> = flow {
// 1. 网络层: IO线程接收P2P数据
val p2pFlow = withContext(networkDispatcher) {
p2pRepository.startStream(deviceId)
}
p2pFlow.collect { videoPacket ->
// 2. 解码层: Default线程解码视频帧
val frame = withContext(videoDecodeDispatcher) {
videoDecoder.decode(videoPacket)
}
// 3. UI层: 主线程更新显示
withContext(Dispatchers.Main.immediate) {
emit(frame)
}
}
}.flowOn(networkDispatcher) // 整体在网络调度器执行
// 多路流并发处理
suspend fun handleMultipleStreams(deviceIds: List<String>): List<Flow<Bitmap>> {
return deviceIds.map { deviceId ->
// 每个流独立调度
startVideoStream(deviceId)
}
}
}
7.2 案例2: 设备事件处理调度
/**
* deviceSecurity: 设备事件分发调度策略
*/
class DeviceEventDispatcher @Inject constructor(
private val eventRepository: EventRepository,
private val notificationManager: NotificationManager,
private val database: EventDatabase
) {
// 事件处理专用调度器
private val eventProcessingDispatcher = Dispatchers.Default.limitedParallelism(4)
private val dbDispatcher = Dispatchers.IO.limitedParallelism(1) // 串行化数据库操作
fun observeDeviceEvents(): Flow<DeviceEvent> = flow {
// 监听事件流
eventRepository.getEventStream()
.flowOn(Dispatchers.IO) // IO线程接收事件
.collect { rawEvent ->
// CPU线程处理事件
val processedEvent = withContext(eventProcessingDispatcher) {
processEvent(rawEvent)
}
// 并发执行:存储 + 通知
coroutineScope {
// 串行存储到数据库
launch(dbDispatcher) {
database.saveEvent(processedEvent)
}
// 主线程显示通知
launch(Dispatchers.Main) {
notificationManager.show(processedEvent)
}
}
emit(processedEvent)
}
}
private fun processEvent(event: RawEvent): DeviceEvent {
// CPU密集的事件处理逻辑
return when (event.type) {
EventType.MOTION -> DeviceEvent.MotionDetected(event.deviceId)
EventType.SOUND -> DeviceEvent.SoundDetected(event.deviceId)
else -> DeviceEvent.Unknown
}
}
}
7.3 案例3: 设备连接管理调度
/**
* deviceSecurity: 设备连接调度优化
*/
class DeviceConnectionManager @Inject constructor(
private val p2pClient: P2PClient,
private val deviceRepository: DeviceRepository
) {
// 连接管理专用调度器
private val connectionDispatcher = Dispatchers.IO.limitedParallelism(8)
/**
* 并发连接多个设备(限制并发数避免资源耗尽)
*/
suspend fun connectMultipleDevices(deviceIds: List<String>): List<ConnectionResult> {
return coroutineScope {
deviceIds.map { deviceId ->
async(connectionDispatcher) {
connectDevice(deviceId)
}
}.awaitAll()
}
}
private suspend fun connectDevice(deviceId: String): ConnectionResult {
return try {
// IO操作: P2P连接
withContext(Dispatchers.IO) {
val connection = p2pClient.connect(deviceId)
// Default操作: 连接参数协商(CPU密集)
val params = withContext(Dispatchers.Default) {
negotiateParameters(connection)
}
// IO操作: 保存连接状态
deviceRepository.saveConnection(deviceId, params)
ConnectionResult.Success(deviceId)
}
} catch (e: Exception) {
ConnectionResult.Failure(deviceId, e)
}
}
private fun negotiateParameters(connection: P2PConnection): ConnectionParams {
// CPU密集的加密协商
return ConnectionParams()
}
/**
* 智能重连策略
*/
fun smartReconnect(deviceId: String) = flow {
var attempt = 0
val maxAttempts = 3
while (attempt < maxAttempts) {
// IO线程尝试重连
val result = withContext(connectionDispatcher) {
connectDevice(deviceId)
}
when (result) {
is ConnectionResult.Success -> {
emit(result)
return@flow
}
is ConnectionResult.Failure -> {
attempt++
// 指数退避
delay(1000L * (1 shl attempt))
}
}
}
emit(ConnectionResult.Failure(deviceId, Exception("Max retry attempts reached")))
}
}
sealed class ConnectionResult {
data class Success(val deviceId: String) : ConnectionResult()
data class Failure(val deviceId: String, val error: Exception) : ConnectionResult()
}
7.4 案例4: 图像AI推理调度
/**
* deviceSecurity: AI图像识别调度策略
*/
class AIInferenceDispatcher @Inject constructor(
private val tensorflowLite: Interpreter,
private val imagePreprocessor: ImagePreprocessor
) {
// AI推理专用调度器(限制并发避免内存爆炸)
private val inferenceDispatcher = Dispatchers.Default.limitedParallelism(1)
/**
* 视频流实时AI分析
*/
fun analyzeVideoStream(videoFlow: Flow<Bitmap>): Flow<AIResult> {
return videoFlow
.flowOn(Dispatchers.IO) // 视频接收在IO线程
.map { frame ->
// CPU密集: 图像预处理 + AI推理
withContext(inferenceDispatcher) {
// 1. 预处理(裁剪、缩放、归一化)
val preprocessed = imagePreprocessor.process(frame)
// 2. AI推理(CPU/NPU密集)
val output = runInference(preprocessed)
// 3. 后处理(解析结果)
parseResult(output)
}
}
.flowOn(inferenceDispatcher) // 整体推理流程在专用调度器
}
private fun runInference(input: FloatArray): FloatArray {
val output = Array(1) { FloatArray(1000) }
tensorflowLite.run(input, output)
return output[0]
}
private fun parseResult(output: FloatArray): AIResult {
val maxIndex = output.indices.maxByOrNull { output[it] } ?: 0
return AIResult(
label = LABELS[maxIndex],
confidence = output[maxIndex]
)
}
companion object {
private val LABELS = listOf("person", "car", "dog", "cat", /* ... */)
}
}
data class AIResult(val label: String, val confidence: Float)
8. 常见问题FAQ
Q1: Dispatchers.IO和Dispatchers.Default有什么区别?
A:
-
Dispatchers.IO:
- 用于阻塞IO操作(网络、文件、数据库)
- 线程池大小:max(64, CPU核心数)
- 可以阻塞线程而不影响性能
-
Dispatchers.Default:
- 用于CPU密集型计算
- 线程池大小:CPU核心数
- 不应该阻塞线程,否则影响其他协程
选择原则:
// ✅ IO操作
withContext(Dispatchers.IO) {
Thread.sleep(1000) // 阻塞OK
}
// ✅ CPU计算
withContext(Dispatchers.Default) {
(1..1000000).sum() // 计算OK
}
// ❌ 错误: CPU计算用IO
withContext(Dispatchers.IO) {
(1..1000000).sum() // 占用IO线程
}
// ❌ 错误: IO阻塞用Default
withContext(Dispatchers.Default) {
Thread.sleep(1000) // 阻塞CPU线程
}
Q2: 什么时候使用withContext vs async/await?
A:
使用withContext(顺序执行):
suspend fun sequential() {
val result1 = withContext(Dispatchers.IO) { fetch1() } // 等待完成
val result2 = withContext(Dispatchers.IO) { fetch2() } // 再执行这个
return result1 + result2
}
使用async/await(并发执行):
suspend fun concurrent() = coroutineScope {
val deferred1 = async(Dispatchers.IO) { fetch1() } // 立即启动
val deferred2 = async(Dispatchers.IO) { fetch2() } // 同时启动
deferred1.await() + deferred2.await() // 等待两者完成
}
性能对比:
- 顺序执行:
fetch1时间 + fetch2时间 - 并发执行:
max(fetch1时间, fetch2时间)
Q3: 如何避免调度器切换开销?
A: 4个优化技巧:
- 批量操作减少切换:
// ❌ 100次切换
repeat(100) {
withContext(Dispatchers.IO) { save(it) }
}
// ✅ 1次切换
withContext(Dispatchers.IO) {
repeat(100) { save(it) }
}
- 使用Main.immediate:
// ❌ 重复调度
withContext(Dispatchers.Main) { updateUI() } // 已在Main线程也会重新调度
// ✅ 避免重复
withContext(Dispatchers.Main.immediate) { updateUI() } // 在Main线程直接执行
- 合理使用flowOn:
flow {
emit(loadData()) // 在flowOn指定的调度器执行
}
.flowOn(Dispatchers.IO) // 整个flow在IO线程
.collect { updateUI(it) } // collect在调用者线程
- 避免不必要的切换:
// ❌ 不必要
suspend fun unnecessarySuspend {
withContext(Dispatchers.Default) {
println("Hello") // 简单操作不需要切换
}
}
// ✅ 直接执行
fun simpleOperation() {
println("Hello")
}
Q4: limitedParallelism有什么作用?
A: limitedParallelism 限制调度器的并发数,适用于:
- 限制资源消耗:
// 限制同时下载数为3
val downloadDispatcher = Dispatchers.IO.limitedParallelism(3)
suspend fun downloadFiles(urls: List<String>) {
urls.forEach { url ->
launch(downloadDispatcher) {
download(url) // 最多3个并发
}
}
}
- 串行化操作:
// 确保数据库事务串行执行
val dbDispatcher = Dispatchers.IO.limitedParallelism(1)
suspend fun transaction() {
withContext(dbDispatcher) {
database.update() // 串行执行,避免并发冲突
}
}
- 控制AI推理并发:
// AI推理很吃内存,限制并发数为1
val aiDispatcher = Dispatchers.Default.limitedParallelism(1)
suspend fun runAI(images: List<Bitmap>) {
images.forEach { image ->
launch(aiDispatcher) {
aiModel.predict(image) // 串行推理
}
}
}
Q5: 在ViewModel中应该用哪个调度器启动协程?
A: 使用 viewModelScope 时的最佳实践:
class MyViewModel : ViewModel() {
// ✅ 推荐: 在launch中指定IO调度器
fun loadData() {
viewModelScope.launch {
// 默认Main调度器
_uiState.value = Loading
try {
val data = withContext(Dispatchers.IO) {
repository.fetchData() // IO操作
}
_uiState.value = Success(data) // 自动回到Main
} catch (e: Exception) {
_uiState.value = Error(e)
}
}
}
// ✅ 也可以: 直接在launch指定调度器
fun loadDataAlternative() {
viewModelScope.launch(Dispatchers.IO) {
try {
val data = repository.fetchData()
withContext(Dispatchers.Main) {
_uiState.value = Success(data) // 切回Main更新UI
}
} catch (e: Exception) {
withContext(Dispatchers.Main) {
_uiState.value = Error(e)
}
}
}
}
// ❌ 不推荐: 使用GlobalScope
fun badExample() {
GlobalScope.launch { // 不会随ViewModel销毁
// 可能导致内存泄漏
}
}
}
推荐模式1的优势:
- 启动在Main线程,便于立即更新UI状态
- 需要IO时明确使用
withContext切换 - 操作完成自动回到Main线程
总结
核心要点
-
四大标准调度器
Dispatchers.Main: UI线程,更新界面Dispatchers.IO: IO操作,网络/文件/数据库Dispatchers.Default: CPU密集计算Dispatchers.Unconfined: 不推荐生产使用
-
线程切换
- 使用
withContext切换调度器 - 顺序操作用
withContext,并发操作用async/await Main.immediate优化避免重复调度
- 使用
-
调度器选择
- 更新UI → Main
- 网络/文件/数据库 → IO
- 计算/解码/加密 → Default
- 根据任务特征选择,避免错误使用
-
性能优化
- 批量操作减少切换次数
- 使用
limitedParallelism控制并发 - 自定义调度器优化特定场景
- 监控线程池使用情况
-
deviceSecurity实战
- 视频流: 网络(IO) → 解码(Default) → 显示(Main)
- 多设备连接:
limitedParallelism限制并发 - AI推理: 专用调度器串行执行
- 事件处理: 并发存储+通知
快速决策表
| 任务类型 | 调度器 | 示例 |
|---|---|---|
| 更新UI | Main | textView.text = "..." |
| 网络请求 | IO | api.fetchData() |
| 数据库查询 | IO | dao.getUsers() |
| 文件读写 | IO | file.readText() |
| JSON解析(大) | Default | Json.decode(json) |
| 图像处理 | Default | bitmap.filter() |
| 列表排序 | Default | list.sorted() |
| 加密解密 | Default | cipher.encrypt() |
| AI推理 | Default(limited) | model.predict() |
关键原则
✅ DO:
- 根据任务特征选择调度器
- 批量操作减少切换
- 使用
limitedParallelism控制并发 - 用
viewModelScope管理生命周期
❌ DON’T:
- 在IO线程做CPU密集计算
- 在Default线程做阻塞IO
- 使用
Dispatchers.Unconfined生产环境 - 使用
GlobalScope启动协程 - 频繁切换调度器
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)