Flow : callbackFlow使用心得,避免踩坑!
简述
原因
这几天需要搭建一个新的架构,整体使用MVVM
,网络这一块选择使用okhttp+retrofit+flow
来进行搭建。直接创建出来的Flow并不是线程安全的且使用起来也不是很方便,所以使用官方提供的创建Flow的APIcallbackFlow
来进行创建。
这里记录一下callbackFlow
使用的心得,踩过的坑就不希望大家在踩一遍了,另外整个架构搭建完成也会开源出来,感兴趣小伙伴的可以关注一波哈。也欢迎找我讨论技术呀!
原理简述
callbackFlow:底层使用channel
来进行中转,首先通过produce
创建一个ReceiveChannel
。然后在调用collect
的时候,在将channel
的值取出来emit
出去。
callbackFlow官方注释
Creates an instance of a cold Flow with elements that are sent to a SendChannel provided to the builder’s block of code via ProducerScope. It allows elements to be produced by code that is running in a different context or concurrently.
The resulting flow is cold, which means that block is called every time a terminal operator is applied to the resulting flow.
This builder ensures thread-safety and context preservation, thus the provided ProducerScope can be used from any context.
创建一个冷流的实例,其中的元素被发送到通过 ProducerScope 提供给构建器代码块的 SendChannel。 它允许由在不同上下文中或同时运行的代码生成元素。
结果流是冷的,这意味着每次将终端运算符应用于结果流时都会调用该块。
这个构建器确保线程安全和上下文保存,因此提供的 ProducerScope 可以在任何上下文中使用。
使用
下面进入callbackFlow
具体使用环节:
举个🌰,简单使用。
val flow = callbackFlow<Int> {
sendBlocking(1)
sendBlocking(2)
sendBlocking(3)
}
flow.collect {
log(it)
}
上面的写法好像没有什么问题,但是具体能不能运行呢?运行一下,发现报错了 🤦♀️
Exception in thread “main” java.lang.IllegalStateException: ‘awaitClose { yourCallbackOrListener.cancel() }’ should be used in the end of callbackFlow block.
说应该在callbackFlow
末尾使用awaitClose
,否则可能存在内存泄漏的危险。
其实就是一个确保肯定会执行的代码块,无论携程被取消或者flow被关闭,都会执行这个代码块,可以在这个代码块里面进行一些资源释放的操作等等,防止内存泄漏。从编译的结果来看,应该是需要强制在callbackFlow
末尾行使用。
看一下awaitClose
的源码:
public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) {
check(kotlin.coroutines.coroutineContext[Job] === this) { "awaitClose() can only be invoked from the producer context" }
try {
suspendCancellableCoroutine<Unit> { cont ->
invokeOnClose {
cont.resume(Unit)
}
}
} finally {
block()
}
}
其实很简单嘛,其实就是一个挂起函数,直到接收到channel
的invokeOnClose
回调,才会resume
回去。
那我们加上awaitClose
,在运行一次:
val flow = callbackFlow<Int> {
sendBlocking(1)
sendBlocking(2)
sendBlocking(3)
awaitClose { log("release resources") }
}
flow.collect {
log(it)
}
哈 可以了 结果打印出来了👀
13:25:04:994 [main] 1
13:25:05:008 [main] 2
13:25:05:008 [main] 3
但是 有没有发现什么不对!!! 为什么没有被关闭呢(没有打印"release resources")?而且我的控制台一直显示程序还在运行中。。。。
所以,这个是需要手动关闭咯?
那我们加上了close
试一下
val flow = callbackFlow<Int> {
sendBlocking(1)
sendBlocking(2)
sendBlocking(3)
close()
awaitClose { log("release resources") }
}
flow.collect {
log(it)
}
运行结果
13:27:25:366 [main] release resources
13:27:25:382 [main] 1
13:27:25:383 [main] 2
13:27:25:383 [main] 3
这次是可以了,而且控制台也显示程序运行结束。
那具体awaitClose
什么时候被回调呢?换句话说,通过callbackFlow
创建的flow什么时候可以被关闭呢?取消流收集cancel()
或基于回调的 API 手动调用 SendChannel.close()
时调用或外部的协程被取消时,才会调用awaitClose
。换句话说,需要手动关闭创建的callbackFlow
,否则就会一直处于运行状态不会结束。
上面的看似已经完美了,但是其实还有一个问题?比如需要将retrofit
的返回值包装成flow
返回出去,那有没有可能出现异常呢?肯定是有的吧,至少会可能出现一个gson解析失败吧。
那出现异常怎么办呢?试一下
val flow = callbackFlow<Int> {
sendBlocking(1)
sendBlocking(2)
sendBlocking(3)
throw IllegalStateException("mock exception")
close()
awaitClose { log("release resources") }
}
flow.collect {
log(it)
}
结果
直接抛异常了,下游一个值也没有接收到,这要是在主线程,岂不是程序就崩溃了嘛。肯定是不能允许这样的事情发生的。聪明的小伙伴已经想到了可以使用try catch来捕获一下异常。但是flow有更优雅的实现方式。
举个🌰
flow
.catch {
log(it.message)
}.collect {
log(it)
}
直接使用catch就可以捕获异常了,另外还有onCompletion类似于finally,大家可以下去自行查看。
13:39:17:860 [main] mock exception
需要注意一点:抛出异常后,前面发射的值也是接收不到的,这里只是接收到了异常信息!
flow还有很多有意思的操作,比如flowOn
背压处理,buffer
、conflate
、collectLatest
,stateFlow 等,这些后面有时间也会发博客进行记录。
总结
callbackFlow简单总结
- 应该使用 awaitClose 来保持流运行,否则在块完成时通道将立即关闭。
- awaitClose 参数在流消费者取消流收集
cancel()
或基于回调的 API 手动调用SendChannel.close()
时调用或外部的协程被取消,通常用于在完成后清理资源。 - 需要进行资源关闭
- 推荐使用catch进行异常捕获
最后放上官方的推荐使用例子(在awaitClose 里面进行api的取消注册,确保了不会发生内存泄漏)
fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
val callback = object : Callback { // Implementation of some callback interface
override fun onNextValue(value: T) {
// To avoid blocking you can configure channel capacity using
// either buffer(Channel.CONFLATED) or buffer(Channel.UNLIMITED) to avoid overfill
try {
sendBlocking(value)
} catch (e: Exception) {
// Handle exception from the channel: failure in flow or premature closing
}
}
override fun onApiError(cause: Throwable) {
cancel(CancellationException("API Error", cause))
}
override fun onCompleted() = channel.close()
}
api.register(callback)
/*
* Suspends until either 'onCompleted'/'onApiError' from the callback is invoked
* or flow collector is cancelled (e.g. by 'take(1)' or because a collector's coroutine was cancelled).
* In both cases, callback will be properly unregistered.
*/
awaitClose { api.unregister(callback) }
}
更多推荐
所有评论(0)