01-01-04 协程调度器与线程完全指南

文档概述

本文档深入讲解Kotlin协程的调度器机制与线程管理,包括调度器类型、线程切换、性能优化和deviceSecurity项目实战案例。

目录


1. 协程调度器基础

1.1 什么是协程调度器

协程调度器(CoroutineDispatcher)决定协程在哪个线程或线程池上执行。它是协程上下文(CoroutineContext)的一部分。

/**
 * 调度器基本概念
 */
class DispatcherBasics {

    // 1. 指定调度器启动协程
    fun example1() {
        CoroutineScope(Dispatchers.Main).launch {
            println("运行在主线程: ${Thread.currentThread().name}")
        }
    }

    // 2. 通过launch参数指定调度器
    fun example2() {
        CoroutineScope(Dispatchers.IO).launch(Dispatchers.Default) {
            println("运行在Default线程池: ${Thread.currentThread().name}")
        }
    }

    // 3. 查看当前调度器
    suspend fun checkDispatcher() {
        val dispatcher = coroutineContext[CoroutineDispatcher]
        println("当前调度器: $dispatcher")
    }
}

1.2 调度器的继承规则

/**
 * 调度器继承规则
 */
class DispatcherInheritance {

    fun demonstrateInheritance() {
        // 父协程的调度器
        CoroutineScope(Dispatchers.Main).launch {
            println("父协程: ${Thread.currentThread().name}")

            // 子协程继承父协程的调度器
            launch {
                println("子协程1(继承): ${Thread.currentThread().name}")
            }

            // 子协程可以指定不同的调度器
            launch(Dispatchers.IO) {
                println("子协程2(IO): ${Thread.currentThread().name}")
            }
        }
    }
}

2. 四大标准调度器

2.1 Dispatchers.Main - UI线程调度器

用途:在Android主线程执行,用于更新UI

/**
 * Dispatchers.Main 使用示例
 */
class MainDispatcherExample {

    // ✅ 推荐: 在Main调度器更新UI
    fun updateUI(viewModel: ViewModel) {
        viewModel.viewModelScope.launch(Dispatchers.Main) {
            val data = loadData() // 可能在IO线程
            textView.text = data  // 必须在主线程
        }
    }

    // Dispatchers.Main.immediate - 立即执行
    fun immediateExecution() {
        CoroutineScope(Dispatchers.Main).launch {
            // 如果已经在主线程,使用immediate避免不必要的调度
            withContext(Dispatchers.Main.immediate) {
                updateUIElement()
            }
        }
    }

    private suspend fun loadData(): String {
        return withContext(Dispatchers.IO) {
            // 网络或数据库操作
            "Data loaded"
        }
    }

    private fun updateUIElement() {
        // UI更新操作
    }
}

特点

  • 单线程执行
  • 避免阻塞主线程,否则会导致ANR
  • Main.immediate:如果已在主线程则立即执行,避免重新调度

2.2 Dispatchers.IO - IO操作调度器

用途:文件读写、网络请求、数据库操作

/**
 * Dispatchers.IO 使用示例
 */
class IODispatcherExample {

    // 网络请求
    suspend fun fetchDataFromNetwork(): String {
        return withContext(Dispatchers.IO) {
            val response = httpClient.get("https://api.example.com/data")
            response.body()
        }
    }

    // 文件读写
    suspend fun readFromFile(file: File): String {
        return withContext(Dispatchers.IO) {
            file.readText()
        }
    }

    // 数据库查询
    suspend fun queryDatabase(dao: UserDao): List<User> {
        return withContext(Dispatchers.IO) {
            dao.getAllUsers()
        }
    }

    // 多个IO操作并发执行
    suspend fun parallelIOOperations() {
        withContext(Dispatchers.IO) {
            val result1 = async { fetchDataFromNetwork() }
            val result2 = async { readFromFile(File("data.txt")) }
            val result3 = async { queryDatabase(userDao) }

            // 等待所有操作完成
            val data = result1.await()
            val fileContent = result2.await()
            val users = result3.await()
        }
    }
}

特点

  • 共享线程池,默认最大64个线程(或CPU核心数,取较大值)
  • 线程可复用,适合IO密集型任务
  • 阻塞操作不会影响其他协程

2.3 Dispatchers.Default - CPU密集型调度器

用途:计算密集型任务、数据处理、算法运算

/**
 * Dispatchers.Default 使用示例
 */
class DefaultDispatcherExample {

    // CPU密集型计算
    suspend fun calculateHash(data: ByteArray): String {
        return withContext(Dispatchers.Default) {
            MessageDigest.getInstance("SHA-256")
                .digest(data)
                .joinToString("") { "%02x".format(it) }
        }
    }

    // 大数据排序
    suspend fun sortLargeList(list: List<Int>): List<Int> {
        return withContext(Dispatchers.Default) {
            list.sorted()
        }
    }

    // 图像处理
    suspend fun processImage(bitmap: Bitmap): Bitmap {
        return withContext(Dispatchers.Default) {
            // CPU密集的图像处理算法
            applyFilter(bitmap)
        }
    }

    // 并行计算
    suspend fun parallelComputation(data: List<Int>): List<Int> {
        return withContext(Dispatchers.Default) {
            data.chunked(data.size / 4) // 分成4块
                .map { chunk ->
                    async { processChunk(chunk) }
                }
                .flatMap { it.await() }
        }
    }

    private fun applyFilter(bitmap: Bitmap): Bitmap {
        // 图像处理逻辑
        return bitmap
    }

    private fun processChunk(chunk: List<Int>): List<Int> {
        // 处理数据块
        return chunk.map { it * 2 }
    }
}

特点

  • 线程池大小等于CPU核心数
  • 适合不阻塞的计算密集型任务
  • 与IO调度器共享线程池资源

2.4 Dispatchers.Unconfined - 非受限调度器

用途:测试和特殊场景,生产环境慎用

/**
 * Dispatchers.Unconfined 使用示例
 */
class UnconfinedDispatcherExample {

    fun demonstrateUnconfined() {
        runBlocking {
            println("主线程: ${Thread.currentThread().name}")

            launch(Dispatchers.Unconfined) {
                println("启动时: ${Thread.currentThread().name}") // 主线程

                delay(100)

                println("恢复后: ${Thread.currentThread().name}") // kotlinx.coroutines.DefaultExecutor
            }
        }
    }

    // ⚠️ 不推荐在生产环境使用
    // 行为不可预测,难以调试
    fun badExample() {
        GlobalScope.launch(Dispatchers.Unconfined) {
            // 第一次挂起前在调用者线程
            println("Before suspend: ${Thread.currentThread().name}")

            withContext(Dispatchers.IO) {
                // IO操作
            }

            // 恢复后在IO线程池的某个线程
            println("After suspend: ${Thread.currentThread().name}")
        }
    }
}

特点

  • 启动时在调用者线程执行
  • 挂起后恢复时在恢复协程的线程执行
  • 行为不确定,难以追踪和调试
  • 不推荐在生产代码中使用

3. 线程切换与withContext

3.1 withContext基础

withContext 是切换调度器的标准方式,它会:

  1. 切换到指定调度器
  2. 执行代码块
  3. 返回结果
  4. 自动切换回原调度器
/**
 * withContext 基础用法
 */
class WithContextBasics {

    // 典型的MVVM模式
    class UserViewModel : ViewModel() {

        private val _users = MutableStateFlow<List<User>>(emptyList())
        val users: StateFlow<List<User>> = _users.asStateFlow()

        fun loadUsers() {
            viewModelScope.launch {
                // 1. 在主线程启动
                _users.value = emptyList()

                try {
                    // 2. 切换到IO线程进行网络请求
                    val result = withContext(Dispatchers.IO) {
                        apiService.getUsers()
                    }

                    // 3. 自动切回主线程更新UI
                    _users.value = result

                } catch (e: Exception) {
                    _users.value = emptyList()
                }
            }
        }
    }
}

3.2 多次线程切换

/**
 * 多次线程切换示例
 */
class MultipleContextSwitch {

    suspend fun complexOperation(): Result {
        // 1. IO线程:网络请求
        val rawData = withContext(Dispatchers.IO) {
            apiService.fetchData()
        }

        // 2. Default线程:数据处理(CPU密集)
        val processedData = withContext(Dispatchers.Default) {
            processLargeData(rawData)
        }

        // 3. IO线程:保存到数据库
        withContext(Dispatchers.IO) {
            database.save(processedData)
        }

        // 4. 返回到原调度器
        return Result.Success(processedData)
    }

    private fun processLargeData(data: String): ProcessedData {
        // CPU密集处理
        return ProcessedData()
    }
}

3.3 withContext vs async/await

/**
 * withContext 与 async/await 对比
 */
class ContextSwitchComparison {

    // ✅ 推荐: 顺序执行使用withContext
    suspend fun sequentialOperations(): String {
        val data1 = withContext(Dispatchers.IO) { fetchData1() }
        val data2 = withContext(Dispatchers.IO) { fetchData2(data1) }
        return "$data1 + $data2"
    }

    // ✅ 推荐: 并发执行使用async/await
    suspend fun concurrentOperations(): String {
        coroutineScope {
            val deferred1 = async(Dispatchers.IO) { fetchData1() }
            val deferred2 = async(Dispatchers.IO) { fetchData2("") }

            val data1 = deferred1.await()
            val data2 = deferred2.await()

            return@coroutineScope "$data1 + $data2"
        }
    }

    // ❌ 不推荐: 顺序操作使用async徒增复杂度
    suspend fun unnecessaryAsync(): String {
        coroutineScope {
            val deferred1 = async(Dispatchers.IO) { fetchData1() }
            val data1 = deferred1.await() // 立即await,失去并发优势

            val deferred2 = async(Dispatchers.IO) { fetchData2(data1) }
            val data2 = deferred2.await()

            return@coroutineScope "$data1 + $data2"
        }
    }

    private suspend fun fetchData1(): String = "Data1"
    private suspend fun fetchData2(param: String): String = "Data2"
}

4. 调度器的选择策略

4.1 决策树

/**
 * 调度器选择决策树
 */
class DispatcherDecisionTree {

    // 场景1: 更新UI
    // 答案: Dispatchers.Main
    fun updateUI() {
        viewModelScope.launch(Dispatchers.Main) {
            textView.text = "Updated"
        }
    }

    // 场景2: 网络请求
    // 答案: Dispatchers.IO
    suspend fun networkRequest() = withContext(Dispatchers.IO) {
        api.getData()
    }

    // 场景3: 数据库操作
    // 答案: Dispatchers.IO
    suspend fun databaseOperation() = withContext(Dispatchers.IO) {
        dao.query()
    }

    // 场景4: 文件读写
    // 答案: Dispatchers.IO
    suspend fun fileOperation() = withContext(Dispatchers.IO) {
        file.readText()
    }

    // 场景5: JSON解析(大文件)
    // 答案: Dispatchers.Default
    suspend fun parseJson(json: String) = withContext(Dispatchers.Default) {
        Json.decodeFromString<Data>(json)
    }

    // 场景6: 图像处理
    // 答案: Dispatchers.Default
    suspend fun processImage(bitmap: Bitmap) = withContext(Dispatchers.Default) {
        applyFilters(bitmap)
    }

    // 场景7: 排序大列表
    // 答案: Dispatchers.Default
    suspend fun sortList(list: List<Int>) = withContext(Dispatchers.Default) {
        list.sorted()
    }

    // 场景8: 加密/解密
    // 答案: Dispatchers.Default
    suspend fun encrypt(data: ByteArray) = withContext(Dispatchers.Default) {
        cipher.doFinal(data)
    }
}

4.2 性能权衡

/**
 * 调度器性能权衡
 */
class DispatcherPerformance {

    // 错误示例1: 在IO线程做CPU密集计算
    // ❌ 会占用IO线程池,影响其他IO操作
    suspend fun badExample1() {
        withContext(Dispatchers.IO) {
            // 大量CPU计算
            (1..1000000).map { it * it }.sum()
        }
    }

    // 正确示例1: CPU密集任务使用Default
    // ✅ 使用专门的CPU线程池
    suspend fun goodExample1() {
        withContext(Dispatchers.Default) {
            (1..1000000).map { it * it }.sum()
        }
    }

    // 错误示例2: 在Default线程做IO操作
    // ❌ 会阻塞CPU线程池
    suspend fun badExample2() {
        withContext(Dispatchers.Default) {
            Thread.sleep(1000) // 阻塞线程
        }
    }

    // 正确示例2: IO操作使用IO调度器
    // ✅ IO线程池专门处理阻塞操作
    suspend fun goodExample2() {
        withContext(Dispatchers.IO) {
            Thread.sleep(1000)
        }
    }

    // 错误示例3: 频繁切换调度器
    // ❌ 过度切换带来额外开销
    suspend fun badExample3() {
        repeat(100) {
            withContext(Dispatchers.IO) {
                // 简单操作
                println(it)
            }
        }
    }

    // 正确示例3: 批量操作减少切换
    // ✅ 一次切换处理批量任务
    suspend fun goodExample3() {
        withContext(Dispatchers.IO) {
            repeat(100) {
                println(it)
            }
        }
    }
}

5. 自定义调度器

5.1 创建固定线程池调度器

/**
 * 自定义线程池调度器
 */
class CustomDispatcher {

    // 创建固定大小的线程池
    private val customDispatcher = Executors.newFixedThreadPool(4)
        .asCoroutineDispatcher()

    // 创建单线程调度器
    private val singleThreadDispatcher = Executors.newSingleThreadExecutor()
        .asCoroutineDispatcher()

    suspend fun useCustomDispatcher() {
        withContext(customDispatcher) {
            // 在自定义线程池执行
            println("Custom thread: ${Thread.currentThread().name}")
        }
    }

    // ⚠️ 重要: 记得关闭调度器释放资源
    fun cleanup() {
        customDispatcher.close()
        singleThreadDispatcher.close()
    }
}

5.2 limitedParallelism - 限制并发数

/**
 * 限制并发数的调度器
 */
class LimitedParallelismExample {

    // 限制IO操作并发数为2
    private val limitedIO = Dispatchers.IO.limitedParallelism(2)

    suspend fun downloadFiles(urls: List<String>) {
        coroutineScope {
            urls.forEach { url ->
                launch(limitedIO) {
                    // 最多2个下载并发执行
                    downloadFile(url)
                }
            }
        }
    }

    // 用例: 避免同时打开太多数据库连接
    private val dbDispatcher = Dispatchers.IO.limitedParallelism(1)

    suspend fun databaseOperations() {
        withContext(dbDispatcher) {
            // 确保数据库操作串行执行
            database.transaction {
                // 事务操作
            }
        }
    }

    private suspend fun downloadFile(url: String) {
        // 下载逻辑
    }
}

5.3 虚拟线程调度器(Java 19+)

/**
 * 虚拟线程调度器
 */
@RequiresApi(Build.VERSION_CODES.TIRAMISU)
class VirtualThreadDispatcher {

    // 使用Java虚拟线程(协程的另一种实现)
    private val virtualDispatcher = Executors.newVirtualThreadPerTaskExecutor()
        .asCoroutineDispatcher()

    suspend fun useVirtualThreads() {
        withContext(virtualDispatcher) {
            // 在虚拟线程执行
            // 适合大量IO密集任务
        }
    }
}

6. 性能优化与最佳实践

6.1 避免过度切换

/**
 * 减少调度器切换
 */
class ReduceContextSwitch {

    // ❌ 过度切换
    suspend fun inefficient() {
        withContext(Dispatchers.IO) { operation1() }
        withContext(Dispatchers.IO) { operation2() }
        withContext(Dispatchers.IO) { operation3() }
    }

    // ✅ 批量处理
    suspend fun efficient() {
        withContext(Dispatchers.IO) {
            operation1()
            operation2()
            operation3()
        }
    }

    private suspend fun operation1() {}
    private suspend fun operation2() {}
    private suspend fun operation3() {}
}

6.2 使用Main.immediate优化

/**
 * Main.immediate 优化
 */
class MainImmediateOptimization {

    // ❌ 不必要的调度
    fun slowUpdate() {
        viewModelScope.launch {
            // 已经在Main线程
            withContext(Dispatchers.Main) {
                textView.text = "Update" // 会重新调度
            }
        }
    }

    // ✅ 使用immediate避免重复调度
    fun fastUpdate() {
        viewModelScope.launch {
            withContext(Dispatchers.Main.immediate) {
                textView.text = "Update" // 直接执行,不调度
            }
        }
    }
}

6.3 协程调度器监控

/**
 * 调度器性能监控
 */
class DispatcherMonitoring {

    // 自定义拦截器监控调度器切换
    class DispatcherInterceptor : CoroutineContext.Element {
        companion object Key : CoroutineContext.Key<DispatcherInterceptor>

        override val key: CoroutineContext.Key<*> get() = Key

        fun onDispatch(dispatcher: CoroutineDispatcher) {
            Timber.d("Dispatching to: $dispatcher on thread: ${Thread.currentThread().name}")
        }
    }

    fun monitorDispatchers() {
        val interceptor = DispatcherInterceptor()

        CoroutineScope(Dispatchers.Main + interceptor).launch {
            interceptor.onDispatch(Dispatchers.Main)

            withContext(Dispatchers.IO) {
                interceptor.onDispatch(Dispatchers.IO)
            }
        }
    }
}

6.4 线程池调优

/**
 * 线程池调优
 */
class ThreadPoolTuning {

    // 根据任务特征创建优化的线程池
    object OptimizedDispatchers {

        // 短任务高并发(如API调用)
        val highConcurrency = Executors.newFixedThreadPool(
            Runtime.getRuntime().availableProcessors() * 4
        ).asCoroutineDispatcher()

        // 长任务低并发(如文件上传)
        val longRunning = Executors.newFixedThreadPool(2).asCoroutineDispatcher()

        // CPU密集型
        val cpuIntensive = Executors.newFixedThreadPool(
            Runtime.getRuntime().availableProcessors()
        ).asCoroutineDispatcher()
    }

    suspend fun optimizedTaskExecution() {
        // 根据任务类型选择合适的调度器
        withContext(OptimizedDispatchers.highConcurrency) {
            // 短任务
        }

        withContext(OptimizedDispatchers.longRunning) {
            // 长任务
        }
    }
}

7. deviceSecurity实战案例

7.1 案例1: 视频流处理调度

/**
 * deviceSecurity: 视频流处理的调度器策略
 */
class VideoStreamDispatcherStrategy @Inject constructor(
    private val p2pRepository: P2PRepository,
    private val videoDecoder: VideoDecoder
) {

    // 自定义调度器
    private val videoDecodeDispatcher = Dispatchers.Default.limitedParallelism(2)
    private val networkDispatcher = Dispatchers.IO.limitedParallelism(4)

    fun startVideoStream(deviceId: String): Flow<Bitmap> = flow {
        // 1. 网络层: IO线程接收P2P数据
        val p2pFlow = withContext(networkDispatcher) {
            p2pRepository.startStream(deviceId)
        }

        p2pFlow.collect { videoPacket ->
            // 2. 解码层: Default线程解码视频帧
            val frame = withContext(videoDecodeDispatcher) {
                videoDecoder.decode(videoPacket)
            }

            // 3. UI层: 主线程更新显示
            withContext(Dispatchers.Main.immediate) {
                emit(frame)
            }
        }
    }.flowOn(networkDispatcher) // 整体在网络调度器执行

    // 多路流并发处理
    suspend fun handleMultipleStreams(deviceIds: List<String>): List<Flow<Bitmap>> {
        return deviceIds.map { deviceId ->
            // 每个流独立调度
            startVideoStream(deviceId)
        }
    }
}

7.2 案例2: 设备事件处理调度

/**
 * deviceSecurity: 设备事件分发调度策略
 */
class DeviceEventDispatcher @Inject constructor(
    private val eventRepository: EventRepository,
    private val notificationManager: NotificationManager,
    private val database: EventDatabase
) {

    // 事件处理专用调度器
    private val eventProcessingDispatcher = Dispatchers.Default.limitedParallelism(4)
    private val dbDispatcher = Dispatchers.IO.limitedParallelism(1) // 串行化数据库操作

    fun observeDeviceEvents(): Flow<DeviceEvent> = flow {
        // 监听事件流
        eventRepository.getEventStream()
            .flowOn(Dispatchers.IO) // IO线程接收事件
            .collect { rawEvent ->
                // CPU线程处理事件
                val processedEvent = withContext(eventProcessingDispatcher) {
                    processEvent(rawEvent)
                }

                // 并发执行:存储 + 通知
                coroutineScope {
                    // 串行存储到数据库
                    launch(dbDispatcher) {
                        database.saveEvent(processedEvent)
                    }

                    // 主线程显示通知
                    launch(Dispatchers.Main) {
                        notificationManager.show(processedEvent)
                    }
                }

                emit(processedEvent)
            }
    }

    private fun processEvent(event: RawEvent): DeviceEvent {
        // CPU密集的事件处理逻辑
        return when (event.type) {
            EventType.MOTION -> DeviceEvent.MotionDetected(event.deviceId)
            EventType.SOUND -> DeviceEvent.SoundDetected(event.deviceId)
            else -> DeviceEvent.Unknown
        }
    }
}

7.3 案例3: 设备连接管理调度

/**
 * deviceSecurity: 设备连接调度优化
 */
class DeviceConnectionManager @Inject constructor(
    private val p2pClient: P2PClient,
    private val deviceRepository: DeviceRepository
) {

    // 连接管理专用调度器
    private val connectionDispatcher = Dispatchers.IO.limitedParallelism(8)

    /**
     * 并发连接多个设备(限制并发数避免资源耗尽)
     */
    suspend fun connectMultipleDevices(deviceIds: List<String>): List<ConnectionResult> {
        return coroutineScope {
            deviceIds.map { deviceId ->
                async(connectionDispatcher) {
                    connectDevice(deviceId)
                }
            }.awaitAll()
        }
    }

    private suspend fun connectDevice(deviceId: String): ConnectionResult {
        return try {
            // IO操作: P2P连接
            withContext(Dispatchers.IO) {
                val connection = p2pClient.connect(deviceId)

                // Default操作: 连接参数协商(CPU密集)
                val params = withContext(Dispatchers.Default) {
                    negotiateParameters(connection)
                }

                // IO操作: 保存连接状态
                deviceRepository.saveConnection(deviceId, params)

                ConnectionResult.Success(deviceId)
            }
        } catch (e: Exception) {
            ConnectionResult.Failure(deviceId, e)
        }
    }

    private fun negotiateParameters(connection: P2PConnection): ConnectionParams {
        // CPU密集的加密协商
        return ConnectionParams()
    }

    /**
     * 智能重连策略
     */
    fun smartReconnect(deviceId: String) = flow {
        var attempt = 0
        val maxAttempts = 3

        while (attempt < maxAttempts) {
            // IO线程尝试重连
            val result = withContext(connectionDispatcher) {
                connectDevice(deviceId)
            }

            when (result) {
                is ConnectionResult.Success -> {
                    emit(result)
                    return@flow
                }
                is ConnectionResult.Failure -> {
                    attempt++
                    // 指数退避
                    delay(1000L * (1 shl attempt))
                }
            }
        }

        emit(ConnectionResult.Failure(deviceId, Exception("Max retry attempts reached")))
    }
}

sealed class ConnectionResult {
    data class Success(val deviceId: String) : ConnectionResult()
    data class Failure(val deviceId: String, val error: Exception) : ConnectionResult()
}

7.4 案例4: 图像AI推理调度

/**
 * deviceSecurity: AI图像识别调度策略
 */
class AIInferenceDispatcher @Inject constructor(
    private val tensorflowLite: Interpreter,
    private val imagePreprocessor: ImagePreprocessor
) {

    // AI推理专用调度器(限制并发避免内存爆炸)
    private val inferenceDispatcher = Dispatchers.Default.limitedParallelism(1)

    /**
     * 视频流实时AI分析
     */
    fun analyzeVideoStream(videoFlow: Flow<Bitmap>): Flow<AIResult> {
        return videoFlow
            .flowOn(Dispatchers.IO) // 视频接收在IO线程
            .map { frame ->
                // CPU密集: 图像预处理 + AI推理
                withContext(inferenceDispatcher) {
                    // 1. 预处理(裁剪、缩放、归一化)
                    val preprocessed = imagePreprocessor.process(frame)

                    // 2. AI推理(CPU/NPU密集)
                    val output = runInference(preprocessed)

                    // 3. 后处理(解析结果)
                    parseResult(output)
                }
            }
            .flowOn(inferenceDispatcher) // 整体推理流程在专用调度器
    }

    private fun runInference(input: FloatArray): FloatArray {
        val output = Array(1) { FloatArray(1000) }
        tensorflowLite.run(input, output)
        return output[0]
    }

    private fun parseResult(output: FloatArray): AIResult {
        val maxIndex = output.indices.maxByOrNull { output[it] } ?: 0
        return AIResult(
            label = LABELS[maxIndex],
            confidence = output[maxIndex]
        )
    }

    companion object {
        private val LABELS = listOf("person", "car", "dog", "cat", /* ... */)
    }
}

data class AIResult(val label: String, val confidence: Float)

8. 常见问题FAQ

Q1: Dispatchers.IO和Dispatchers.Default有什么区别?

A:

  • Dispatchers.IO:

    • 用于阻塞IO操作(网络、文件、数据库)
    • 线程池大小:max(64, CPU核心数)
    • 可以阻塞线程而不影响性能
  • Dispatchers.Default:

    • 用于CPU密集型计算
    • 线程池大小:CPU核心数
    • 不应该阻塞线程,否则影响其他协程

选择原则

// ✅ IO操作
withContext(Dispatchers.IO) {
    Thread.sleep(1000) // 阻塞OK
}

// ✅ CPU计算
withContext(Dispatchers.Default) {
    (1..1000000).sum() // 计算OK
}

// ❌ 错误: CPU计算用IO
withContext(Dispatchers.IO) {
    (1..1000000).sum() // 占用IO线程
}

// ❌ 错误: IO阻塞用Default
withContext(Dispatchers.Default) {
    Thread.sleep(1000) // 阻塞CPU线程
}

Q2: 什么时候使用withContext vs async/await?

A:

使用withContext(顺序执行):

suspend fun sequential() {
    val result1 = withContext(Dispatchers.IO) { fetch1() } // 等待完成
    val result2 = withContext(Dispatchers.IO) { fetch2() } // 再执行这个
    return result1 + result2
}

使用async/await(并发执行):

suspend fun concurrent() = coroutineScope {
    val deferred1 = async(Dispatchers.IO) { fetch1() } // 立即启动
    val deferred2 = async(Dispatchers.IO) { fetch2() } // 同时启动
    deferred1.await() + deferred2.await() // 等待两者完成
}

性能对比:

  • 顺序执行: fetch1时间 + fetch2时间
  • 并发执行: max(fetch1时间, fetch2时间)

Q3: 如何避免调度器切换开销?

A: 4个优化技巧:

  1. 批量操作减少切换:
// ❌ 100次切换
repeat(100) {
    withContext(Dispatchers.IO) { save(it) }
}

// ✅ 1次切换
withContext(Dispatchers.IO) {
    repeat(100) { save(it) }
}
  1. 使用Main.immediate:
// ❌ 重复调度
withContext(Dispatchers.Main) { updateUI() } // 已在Main线程也会重新调度

// ✅ 避免重复
withContext(Dispatchers.Main.immediate) { updateUI() } // 在Main线程直接执行
  1. 合理使用flowOn:
flow {
    emit(loadData()) // 在flowOn指定的调度器执行
}
.flowOn(Dispatchers.IO) // 整个flow在IO线程
.collect { updateUI(it) } // collect在调用者线程
  1. 避免不必要的切换:
// ❌ 不必要
suspend fun unnecessarySuspend {
    withContext(Dispatchers.Default) {
        println("Hello") // 简单操作不需要切换
    }
}

// ✅ 直接执行
fun simpleOperation() {
    println("Hello")
}

Q4: limitedParallelism有什么作用?

A: limitedParallelism 限制调度器的并发数,适用于:

  1. 限制资源消耗:
// 限制同时下载数为3
val downloadDispatcher = Dispatchers.IO.limitedParallelism(3)

suspend fun downloadFiles(urls: List<String>) {
    urls.forEach { url ->
        launch(downloadDispatcher) {
            download(url) // 最多3个并发
        }
    }
}
  1. 串行化操作:
// 确保数据库事务串行执行
val dbDispatcher = Dispatchers.IO.limitedParallelism(1)

suspend fun transaction() {
    withContext(dbDispatcher) {
        database.update() // 串行执行,避免并发冲突
    }
}
  1. 控制AI推理并发:
// AI推理很吃内存,限制并发数为1
val aiDispatcher = Dispatchers.Default.limitedParallelism(1)

suspend fun runAI(images: List<Bitmap>) {
    images.forEach { image ->
        launch(aiDispatcher) {
            aiModel.predict(image) // 串行推理
        }
    }
}

Q5: 在ViewModel中应该用哪个调度器启动协程?

A: 使用 viewModelScope 时的最佳实践:

class MyViewModel : ViewModel() {

    // ✅ 推荐: 在launch中指定IO调度器
    fun loadData() {
        viewModelScope.launch {
            // 默认Main调度器
            _uiState.value = Loading

            try {
                val data = withContext(Dispatchers.IO) {
                    repository.fetchData() // IO操作
                }
                _uiState.value = Success(data) // 自动回到Main
            } catch (e: Exception) {
                _uiState.value = Error(e)
            }
        }
    }

    // ✅ 也可以: 直接在launch指定调度器
    fun loadDataAlternative() {
        viewModelScope.launch(Dispatchers.IO) {
            try {
                val data = repository.fetchData()

                withContext(Dispatchers.Main) {
                    _uiState.value = Success(data) // 切回Main更新UI
                }
            } catch (e: Exception) {
                withContext(Dispatchers.Main) {
                    _uiState.value = Error(e)
                }
            }
        }
    }

    // ❌ 不推荐: 使用GlobalScope
    fun badExample() {
        GlobalScope.launch { // 不会随ViewModel销毁
            // 可能导致内存泄漏
        }
    }
}

推荐模式1的优势:

  • 启动在Main线程,便于立即更新UI状态
  • 需要IO时明确使用withContext切换
  • 操作完成自动回到Main线程

总结

核心要点

  1. 四大标准调度器

    • Dispatchers.Main: UI线程,更新界面
    • Dispatchers.IO: IO操作,网络/文件/数据库
    • Dispatchers.Default: CPU密集计算
    • Dispatchers.Unconfined: 不推荐生产使用
  2. 线程切换

    • 使用withContext切换调度器
    • 顺序操作用withContext,并发操作用async/await
    • Main.immediate优化避免重复调度
  3. 调度器选择

    • 更新UI → Main
    • 网络/文件/数据库 → IO
    • 计算/解码/加密 → Default
    • 根据任务特征选择,避免错误使用
  4. 性能优化

    • 批量操作减少切换次数
    • 使用limitedParallelism控制并发
    • 自定义调度器优化特定场景
    • 监控线程池使用情况
  5. deviceSecurity实战

    • 视频流: 网络(IO) → 解码(Default) → 显示(Main)
    • 多设备连接: limitedParallelism限制并发
    • AI推理: 专用调度器串行执行
    • 事件处理: 并发存储+通知

快速决策表

任务类型 调度器 示例
更新UI Main textView.text = "..."
网络请求 IO api.fetchData()
数据库查询 IO dao.getUsers()
文件读写 IO file.readText()
JSON解析(大) Default Json.decode(json)
图像处理 Default bitmap.filter()
列表排序 Default list.sorted()
加密解密 Default cipher.encrypt()
AI推理 Default(limited) model.predict()

关键原则

DO:

  • 根据任务特征选择调度器
  • 批量操作减少切换
  • 使用limitedParallelism控制并发
  • viewModelScope管理生命周期

DON’T:

  • 在IO线程做CPU密集计算
  • 在Default线程做阻塞IO
  • 使用Dispatchers.Unconfined生产环境
  • 使用GlobalScope启动协程
  • 频繁切换调度器
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐