我们眼中 RxJava 式的网络请求

诞生 5 年之久的 RxJava,已经不只是一个开源库,可以说它的诞生改变了我们写代码的方式,把它比作「神兵利器」也毫不为过。我们现在已经能看到各式各样名为「最佳实践」的使用教程,如果我们没能用好这把利器,不仅不会发挥它的作用,反而会伤着我们自己。

回顾它的诞生原因,是为了解决回调地狱 (callback hell) 以及麻烦的线程切换。在 Android 开发中,哪个地方最会出现多层的回调嵌套以及频繁的线程切换呢?对!没错!是「网络请求」。所以 RxJava、Retrofit 这俩兄弟总会一起出现的,我们项目中关于 RxJava 的使用,也几乎都和网络请求相关。

过去的经验

最初我们对 RxJava + Retrofit 的使用经验都是来源于 RxJava 与 Retrofit 结合的最佳实践 这篇文章,相信大家都看过。这篇文章中的基本封装思想是:订阅每个网络请求的流,将流的订阅结果再通过回调的方式返给流(也就是网络请求)的创建者。 如下所示:

//HttpMethod
public void getTopMovie(final ResultListener listener, int start, int count){
movieService.getTopMovie(start, count)
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber(){
@Override
public void onStart() {}

@Override
public void onNext(Subject t) {
listener.onNext(t)
}

@Override
public void onError(Throwable e) {}

@Override
public void onCompleted() {}

})
}

//Activity
HttpMethods.getInstance().getTopMovie(new ResultListener(){
@Override
public void onNext(Subject t){
//handle result
}
}, 0, 10)

有什么问题?

这种封装方式,对于初步的使用以及简单的项目,是没有问题的。但是遇到复杂一点的网络请求,它的扩展性就不那么灵活了:

  1. 多个连续的网络请求怎么写?按照上面的那种封装方式,我们有两种选择
    1. 拆解这个请求,在 subscribe 之前通过 flatMap 发起第二个或者第三个网络请求。这种写法肯定会影响项目中已有的外部调用。
    2. onNext 中发起第二个请求,再在第二个网络请求的 onNext 中发起第三个网络请求……这一层又一层的回调嵌套,正是用 RxJava 所能解决、避免的这样写,我们就又回到了最初的原点。
  2. 怎么取消网络请求?不取消,意味着内存泄露的风险。

回到 RxJava 本身

RxJava 提供给我们的、我们所中意的强大之处在哪?在于它的「操作符」,mapflatMapzip 等等,甚至线程的切换 subscribeOnobserveOn 也是操作符。RxJava 的各种强大的功能就是通过各式各样的「操作符」实现的。

操作符操作的是什么?流。流(ObservableFlowable)是 RxJava 的基本单位。所以一套链式请求拆开应该是这样的:



所以说,网络请求库对外提供网络请求的结果应该是以「流」的形式进行提供:

  • 单个网络请求,对外提供单个「流」
  • 多个网络请求,将多个网络请求结果流通过「操作符」组合成一个「流」对外提供
  • 持久化:网络请求结果流和持久化的缓存流,总能通过「操作符」组合成一个对外提供的结果「流」

我们需要背压吗?

当生产者大于消费者,则市场价格会降低,则会产生背压问题(Backpressure)。解决背压有很多种策略,RxJava2 中的 Flowable 天然支持背压。所以 Flowable 这个万金油,不管三七二十一,直接拿来用是没有问题的。

但是,网络请求,会产生背压问题吗?不会,为了防止抬杠,可以说大部分情况下是不会的。网络请求的每一个流,即用即走,上游的生产者(Request)和下游的消费者(Responese),永远是一对一的关系,不会出现连续的事件流。杀鸡焉用牛刀,所以我们可以退一步,改用 Observable

网络请求不会出现连续的事件流,在 onNext 出现之后,onComplete 马上就会被调用,所以只需要这两者中的一个就够了,也就不用考虑 Observable,同样 Maybe 也是可以排除的。

剩下的也就只有 SingleCompletable 了,相对于 SingleCompletable 没有 mapflatMap 方法。所以需要进一步处理网络请求结果的我们,可以选择使用 Single

抛出异常

网络请求过程,协议层的异常会自动抛至 onError() ,如 404、503 错误。对于如下有请求结果但无目标请求数据,我们也应当作为异常来处理:

{
"code": "6002",
"msg": "公钥为空"
}

毕竟这样的请求结果,是后端经过异常处理返回给我们的。

假定我们的请求结果是这样的范式:

data CommonResult<T>(
var code: Int = 0,
var data: T? = null,
var message: String? = null
)

我们活用 RxJava 的操作符,用 map 来处理请求到的 ResponseBody (这也是前面选择 Single 的原因),为了便于复用,可以定义一个这样的 mapper:

class CommonResultMapper<T> : Function<CommonResult<T>, T> {
override fun apply(t: CommonResult<T>): T {
val data = t.data
if (t.code == SUCCESS_CODE && data != null) {
return data
} else {
//抛出异常
throw Throwable("请求 $t 失败")
}
}
}

使用这个定义好的 mapper:

@GET(PUSH_URL)
fun fetchTag(@Query("udid") udid: String): Single<CommonResult<Tag>>

fun fetchTagResult(udid: String): Single<Tag> {
return netService.fetchTag(udid)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map(CommonResultMapper())
}

如果你愿意,你还可以将线程切换数据处理结合在一起,使用 RxJava 的 Transformer

//定义一个 transformer
fun <T> resultTransformer(): SingleTransformer<CommonResult<T>, T> {
return SingleTransformer { single ->
single.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map(CommonResultMapper())
}
}

//使用
fun fetchTagResult(udid: String): Single<Tag> {
return netService.fetchTag(udid)
.compose(resultTransformer())
}

使用这样封装,结果归结果,异常归异常。

fetchTagResult("123321")
.subscribeBy(
onSuccess = { tag ->
//结果
},
onError = { e ->
//异常
}
)

回顾上图中的对内封装对外可见,在得到真正想要的网络请求结果之前,需要一直保持对内封装的状态。因此,如果需要同时或者按顺序发起多个网络请求,那么就应该在对内封装中进行操作,例如可以使用 flatMap 按顺序发起第二个网络请求:

fun fetchUserSingle(tag: Tag): Single<User> {
return netService.fetchUser(tag)
}

fun fetchUserResult(udid: String): Single<User> {
return netService.fetchTag(udid)
.compose(resultTransformer())
.flatMap{ tag ->
//使用第一个请求的结果作为第二个请求的参数
return@flatMap fetchUserSingle(tag)
}
}

无论如何,善用操作符,我们的代码总会是「链式」的。

取消网络请求与内存泄漏

最后还需要关注一下这里的内存泄漏问题,在 Activity 销毁时,要及时取消掉这些已经失去上下文意义的网络请求。这里我们及时 unsubscribe 就好了。

同时在管理生命周期方面,也有更成熟的方案:RxLifecycle

以上