文章目录

简述

原因
这几天需要搭建一个新的架构,整体使用MVVM,网络这一块选择使用okhttp+retrofit+flow来进行搭建。直接创建出来的Flow并不是线程安全的且使用起来也不是很方便,所以使用官方提供的创建Flow的APIcallbackFlow来进行创建。
这里记录一下callbackFlow使用的心得,踩过的坑就不希望大家在踩一遍了,另外整个架构搭建完成也会开源出来,感兴趣小伙伴的可以关注一波哈。也欢迎找我讨论技术呀!

原理简述
callbackFlow:底层使用channel来进行中转,首先通过produce创建一个ReceiveChannel。然后在调用collect的时候,在将channel的值取出来emit出去。

callbackFlow官方注释

Creates an instance of a cold Flow with elements that are sent to a SendChannel provided to the builder’s block of code via ProducerScope. It allows elements to be produced by code that is running in a different context or concurrently.
The resulting flow is cold, which means that block is called every time a terminal operator is applied to the resulting flow.
This builder ensures thread-safety and context preservation, thus the provided ProducerScope can be used from any context.
创建一个冷流的实例,其中的元素被发送到通过 ProducerScope 提供给构建器代码块的 SendChannel。 它允许由在不同上下文中或同时运行的代码生成元素。
结果流是冷的,这意味着每次将终端运算符应用于结果流时都会调用该块。
这个构建器确保线程安全和上下文保存,因此提供的 ProducerScope 可以在任何上下文中使用。

使用

下面进入callbackFlow具体使用环节:
举个🌰,简单使用。

val flow = callbackFlow<Int> {
    sendBlocking(1)
    sendBlocking(2)
    sendBlocking(3)
}

flow.collect { 
    log(it)
}

上面的写法好像没有什么问题,但是具体能不能运行呢?运行一下,发现报错了 🤦‍♀️
Exception in thread “main” java.lang.IllegalStateException: ‘awaitClose { yourCallbackOrListener.cancel() }’ should be used in the end of callbackFlow block.

说应该在callbackFlow末尾使用awaitClose,否则可能存在内存泄漏的危险。

其实就是一个确保肯定会执行的代码块,无论携程被取消或者flow被关闭,都会执行这个代码块,可以在这个代码块里面进行一些资源释放的操作等等,防止内存泄漏。从编译的结果来看,应该是需要强制在callbackFlow末尾行使用。
看一下awaitClose的源码:

public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) {
    check(kotlin.coroutines.coroutineContext[Job] === this) { "awaitClose() can only be invoked from the producer context" }
    try {
        suspendCancellableCoroutine<Unit> { cont ->
            invokeOnClose {
                cont.resume(Unit)
            }
        }
    } finally {
        block()
    }
}

其实很简单嘛,其实就是一个挂起函数,直到接收到channelinvokeOnClose回调,才会resume回去。

那我们加上awaitClose,在运行一次:

val flow = callbackFlow<Int> {
     sendBlocking(1)
     sendBlocking(2)
     sendBlocking(3)
     awaitClose { log("release resources") }
 }

 flow.collect {
     log(it)
 }

哈 可以了 结果打印出来了👀

13:25:04:994 [main] 1
13:25:05:008 [main] 2
13:25:05:008 [main] 3

但是 有没有发现什么不对!!! 为什么没有被关闭呢(没有打印"release resources")?而且我的控制台一直显示程序还在运行中。。。。
所以,这个是需要手动关闭咯?

那我们加上了close试一下

 val flow = callbackFlow<Int> {
     sendBlocking(1)
     sendBlocking(2)
     sendBlocking(3)
     close()
     awaitClose { log("release resources") }
 }

 flow.collect {
     log(it)
 }

运行结果

13:27:25:366 [main] release resources
13:27:25:382 [main] 1
13:27:25:383 [main] 2
13:27:25:383 [main] 3

这次是可以了,而且控制台也显示程序运行结束。

那具体awaitClose什么时候被回调呢?换句话说,通过callbackFlow创建的flow什么时候可以被关闭呢?取消流收集cancel()或基于回调的 API 手动调用 SendChannel.close() 时调用或外部的协程被取消时,才会调用awaitClose。换句话说,需要手动关闭创建的callbackFlow,否则就会一直处于运行状态不会结束。

上面的看似已经完美了,但是其实还有一个问题?比如需要将retrofit的返回值包装成flow返回出去,那有没有可能出现异常呢?肯定是有的吧,至少会可能出现一个gson解析失败吧。

那出现异常怎么办呢?试一下

val flow = callbackFlow<Int> {
    sendBlocking(1)
    sendBlocking(2)
    sendBlocking(3)
    throw IllegalStateException("mock exception")
    close()
    awaitClose { log("release resources") }
}

flow.collect {
    log(it)
}

结果
在这里插入图片描述
直接抛异常了,下游一个值也没有接收到,这要是在主线程,岂不是程序就崩溃了嘛。肯定是不能允许这样的事情发生的。聪明的小伙伴已经想到了可以使用try catch来捕获一下异常。但是flow有更优雅的实现方式。

举个🌰

flow
.catch {
    log(it.message)
}.collect {
    log(it)
}

直接使用catch就可以捕获异常了,另外还有onCompletion类似于finally,大家可以下去自行查看。

13:39:17:860 [main] mock exception

需要注意一点:抛出异常后,前面发射的值也是接收不到的,这里只是接收到了异常信息!

flow还有很多有意思的操作,比如flowOn背压处理,bufferconflatecollectLatest,stateFlow 等,这些后面有时间也会发博客进行记录。

总结

callbackFlow简单总结

  • 应该使用 awaitClose 来保持流运行,否则在块完成时通道将立即关闭。
  • awaitClose 参数在流消费者取消流收集cancel()或基于回调的 API 手动调用 SendChannel.close() 时调用或外部的协程被取消,通常用于在完成后清理资源。
  • 需要进行资源关闭
  • 推荐使用catch进行异常捕获

最后放上官方的推荐使用例子(在awaitClose 里面进行api的取消注册,确保了不会发生内存泄漏)

fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
    val callback = object : Callback { // Implementation of some callback interface
        override fun onNextValue(value: T) {
            // To avoid blocking you can configure channel capacity using
            // either buffer(Channel.CONFLATED) or buffer(Channel.UNLIMITED) to avoid overfill
            try {
                sendBlocking(value)
            } catch (e: Exception) {
                // Handle exception from the channel: failure in flow or premature closing
            }
        }
        override fun onApiError(cause: Throwable) {
            cancel(CancellationException("API Error", cause))
        }
        override fun onCompleted() = channel.close()
    }
    api.register(callback)
    /*
     * Suspends until either 'onCompleted'/'onApiError' from the callback is invoked
     * or flow collector is cancelled (e.g. by 'take(1)' or because a collector's coroutine was cancelled).
     * In both cases, callback will be properly unregistered.
     */
        awaitClose { api.unregister(callback) }
    }
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐