诞生 5 年之久的 RxJava,已经不只是一个开源库,可以说它的诞生改变了我们写代码的方式,把它比作「神兵利器」也毫不为过。我们现在已经能看到各式各样名为「最佳实践」的使用教程,如果我们没能用好这把利器,不仅不会发挥它的作用,反而会伤着我们自己。
回顾它的诞生原因,是为了解决回调地狱 (callback hell) 以及麻烦的线程切换。在 Android 开发中,哪个地方最会出现多层的回调嵌套以及频繁的线程切换呢?对!没错!是「网络请求」。所以 RxJava、Retrofit 这俩兄弟总会一起出现的,我们项目中关于 RxJava 的使用,也几乎都和网络请求相关。
过去的经验
最初我们对 RxJava + Retrofit 的使用经验都是来源于 RxJava 与 Retrofit 结合的最佳实践 这篇文章,相信大家都看过。这篇文章中的基本封装思想是:订阅每个网络请求的流,将流的订阅结果再通过回调的方式返给流(也就是网络请求)的创建者。 如下所示:
//HttpMethod
public void getTopMovie(final ResultListener listener, int start, int count){
movieService.getTopMovie(start, count)
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber(){
@Override
public void onStart() {}
@Override
public void onNext(Subject t) {
listener.onNext(t)
}
@Override
public void onError(Throwable e) {}
@Override
public void onCompleted() {}
})
}
//Activity
HttpMethods.getInstance().getTopMovie(new ResultListener(){
@Override
public void onNext(Subject t){
//handle result
}
}, 0, 10)
有什么问题?
这种封装方式,对于初步的使用以及简单的项目,是没有问题的。但是遇到复杂一点的网络请求,它的扩展性就不那么灵活了:
- 多个连续的网络请求怎么写?按照上面的那种封装方式,我们有两种选择
- 拆解这个请求,在
subscribe
之前通过flatMap
发起第二个或者第三个网络请求。这种写法肯定会影响项目中已有的外部调用。 - 在
onNext
中发起第二个请求,再在第二个网络请求的onNext
中发起第三个网络请求……这一层又一层的回调嵌套,正是用 RxJava 所能解决、避免的这样写,我们就又回到了最初的原点。
- 拆解这个请求,在
- 怎么取消网络请求?不取消,意味着内存泄露的风险。
回到 RxJava 本身
RxJava 提供给我们的、我们所中意的强大之处在哪?在于它的「操作符」,map
、flatMap
、zip
等等,甚至线程的切换 subscribeOn
、observeOn
也是操作符。RxJava 的各种强大的功能就是通过各式各样的「操作符」实现的。
操作符操作的是什么?流。流(Observable
、Flowable
)是 RxJava 的基本单位。所以一套链式请求拆开应该是这样的:
所以说,网络请求库对外提供网络请求的结果应该是以「流」的形式进行提供:
- 单个网络请求,对外提供单个「流」
- 多个网络请求,将多个网络请求结果流通过「操作符」组合成一个「流」对外提供
- 持久化:网络请求结果流和持久化的缓存流,总能通过「操作符」组合成一个对外提供的结果「流」
我们需要背压吗?
当生产者大于消费者,则市场价格会降低,则会产生背压问题(Backpressure)。解决背压有很多种策略,RxJava2 中的 Flowable
天然支持背压。所以 Flowable
这个万金油,不管三七二十一,直接拿来用是没有问题的。
但是,网络请求,会产生背压问题吗?不会,为了防止抬杠,可以说大部分情况下是不会的。网络请求的每一个流,即用即走,上游的生产者(Request
)和下游的消费者(Responese
),永远是一对一的关系,不会出现连续的事件流。杀鸡焉用牛刀,所以我们可以退一步,改用 Observable
。
网络请求不会出现连续的事件流,在 onNext
出现之后,onComplete
马上就会被调用,所以只需要这两者中的一个就够了,也就不用考虑 Observable
,同样 Maybe
也是可以排除的。
剩下的也就只有 Single
和 Completable
了,相对于 Single
,Completable
没有 map
和 flatMap
方法。所以需要进一步处理网络请求结果的我们,可以选择使用 Single
。
抛出异常
网络请求过程,协议层的异常会自动抛至 onError()
,如 404、503 错误。对于如下有请求结果但无目标请求数据,我们也应当作为异常来处理:
{
"code": "6002",
"msg": "公钥为空"
}
毕竟这样的请求结果,是后端经过异常处理返回给我们的。
假定我们的请求结果是这样的范式:
data CommonResult<T>(
var code: Int = 0,
var data: T? = null,
var message: String? = null
)
我们活用 RxJava 的操作符,用 map
来处理请求到的 ResponseBody
(这也是前面选择 Single 的原因),为了便于复用,可以定义一个这样的 mapper:
class CommonResultMapper<T> : Function<CommonResult<T>, T> {
override fun apply(t: CommonResult<T>): T {
val data = t.data
if (t.code == SUCCESS_CODE && data != null) {
return data
} else {
//抛出异常
throw Throwable("请求 $t 失败")
}
}
}
使用这个定义好的 mapper:
@GET(PUSH_URL)
fun fetchTag(@Query("udid") udid: String): Single<CommonResult<Tag>>
fun fetchTagResult(udid: String): Single<Tag> {
return netService.fetchTag(udid)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map(CommonResultMapper())
}
如果你愿意,你还可以将线程切换和数据处理结合在一起,使用 RxJava 的 Transformer
//定义一个 transformer
fun <T> resultTransformer(): SingleTransformer<CommonResult<T>, T> {
return SingleTransformer { single ->
single.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map(CommonResultMapper())
}
}
//使用
fun fetchTagResult(udid: String): Single<Tag> {
return netService.fetchTag(udid)
.compose(resultTransformer())
}
使用这样封装,结果归结果,异常归异常。
fetchTagResult("123321")
.subscribeBy(
onSuccess = { tag ->
//结果
},
onError = { e ->
//异常
}
)
回顾上图中的对内封装和对外可见,在得到真正想要的网络请求结果之前,需要一直保持对内封装的状态。因此,如果需要同时或者按顺序发起多个网络请求,那么就应该在对内封装中进行操作,例如可以使用 flatMap
按顺序发起第二个网络请求:
fun fetchUserSingle(tag: Tag): Single<User> {
return netService.fetchUser(tag)
}
fun fetchUserResult(udid: String): Single<User> {
return netService.fetchTag(udid)
.compose(resultTransformer())
.flatMap{ tag ->
//使用第一个请求的结果作为第二个请求的参数
return@flatMap fetchUserSingle(tag)
}
}
无论如何,善用操作符,我们的代码总会是「链式」的。
取消网络请求与内存泄漏
最后还需要关注一下这里的内存泄漏问题,在 Activity
销毁时,要及时取消掉这些已经失去上下文意义的网络请求。这里我们及时 unsubscribe
就好了。
同时在管理生命周期方面,也有更成熟的方案:RxLifecycle。
以上