我是靠谱客的博主 威武百褶裙,这篇文章主要介绍RxJava2.0学习笔记(Backpressure,Flowable),现在分享给大家,希望可以做个参考。

文章转载自:大神的简书

讲Backpressure之前先回顾一下zip,zip可以将事件组合发送,但是如果有一个Observable发送的速度很快,当它发送了1000个事件时,另外一个Observable只发送了1个事件,也就是只能组合一个,另外999个事件等待组合,那它们放在哪儿呢?其实,Zip给每个水管配备了一个水缸,用来存放发送待处理的事件。如图:

这里写图片描述

当需要组合的时候,如果水缸里面都有事件,就会从水缸中取出事件组合,取出的顺序是先进先出,也就是队列的结构,源码实现上就是用队列模拟水缸。

水缸有容量限制,不可能一直存储,看下面的例子:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { for (int i = 0; ; i++) { //无限循环发事件 emitter.onNext(i); } } }).subscribeOn(Schedulers.io()); Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("A"); } }).subscribeOn(Schedulers.io()); Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() { @Override public String apply(Integer integer, String s) throws Exception { return integer + s; } }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, s); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.w(TAG, throwable); } });

一个Observable没有发送complete,可以无限发送数据,另一个Observable随意发送,结果如下:

这里写图片描述

可以看出内存一下子就被占满了,最终报出了OOM。

这就引出了Backpressure,Backpressure就是为了控制流量。既然水缸容量有限,就需要从源头抑制流量。

下面先看一个Observable的情况:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { for (int i = 0; ; i++) { //无限循环发事件 emitter.onNext(i); } } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Thread.sleep(2000); Log.d(TAG, "" + integer); } });

结果如下:

这里写图片描述

可以看出流量得到了遏制,这是因为上下游在同一个线程中,调用emitter.onNext(i)实际上是在调用Consumer中的方法,延迟两秒发送。

如果将上下游放在不同的线程中执行,会出现最开始的那种爆内存的情况。

具体原因就是异步同步的问题。
同一个线程中存在着同步订阅的关系,上游必须等到下游接受并处理完事件以后才能发送事件,因为一个线程同一时间只能做一件事,不能这边不停发送那边还在处理接收事件。
异步订阅:不同线程之间没法直接通信,需要有个中间媒介,而水缸就是中间媒介,管道将事件不断地发送到水缸中,再由水缸拿出事件组合,不同水缸之间不做操作下无法相互影响,所以这边快,那边慢,就可能产生OOM。

解决办法:

  1. 控制上游发送事件的速度,即可以给上游加线程的sleep。
  2. 控制上游发送事件的“量”,即控制发送到水缸中事件的数量。

补充说明:

rxjava2.x的Observable是不存在背压的概念的,背压是下游控制上游流速的一种手段。在rxjava1.x的时代,上游会给下游set一个producer,下游通过producer向上游请求n个数据,这样上游就有记录下游请求了多少个数据,然后下游请求多少个上游就给多少个,这个就是背压。一般来讲,每个节点都有缓存,比如说缓存的大小是64,这个时候下游可以一次性向上游request 64个数据。rxjava1.x的有些操作符不支持背压,也就是说这些操作符不会给下游set一个producer,也就是上游根本不理会下游的请求,一直向下游丢数据,如果下游的缓存爆了,那么下游就会抛出MissingBackpressureException,也就是背压失效了。在rxjava2.x时代,上述的背压逻辑全部挪到Flowable里了,所以说Flowable支持背压。而2.x时代的Observable是没有背压的概念的,Observable如果来不及消费会死命的缓存直到OOM,所以rxjava2.x的官方文档里面有讲,大数据流用Flowable,小数据流用Observable。


Flowable

还是之前的两根水管,只是上游的Observable换成了Flowable,下游的Observer换成了Subscriber,之间的连通还是用subscribe()。

基本用法:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { Log.d(TAG, "emit 1"); emitter.onNext(1); Log.d(TAG, "emit 2"); emitter.onNext(2); Log.d(TAG, "emit 3"); emitter.onNext(3); Log.d(TAG, "emit complete"); emitter.onComplete(); } }, BackpressureStrategy.ERROR); //增加了一个参数 Subscriber<Integer> downstream = new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); s.request(Long.MAX_VALUE); //注意这句代码 } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: " + integer); } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }; upstream.subscribe(downstream);

结果与Observable一致,多出来的参数用来选择背压,这里我们选择BackpressureStrategy.ERROR方式,也就是在上下流速不均衡的时候会抛出MissingBackpressureException的异常;原本的参数Disposable被Subscription取代,它们都可以切断水管,前者调用dispose()方法,后者调用cancel()方法,不同在于后者多了一个void request(long n)方法。

Flowable采用的是响应拉取的方式来解决上下游流量不均衡的问题,request可以看作是下游解决问题的能力,下游向上游request多少个事件,上游如果有的话,就给下游发送多少个事件,这样就能够解决OOM的问题了。

这样就可以解释去掉request()方法后的现象了。

在同一个线程中,上游发送第一个事件以后,就报MissingBackpressureException异常,并且下游接收不到任何事件;这是因为在同一个线程中是同步订阅关系,下游没有request,上游认为下游不能处理事件,就一直等待,但是长时间没有响应会导致页面卡住,所以就抛出了异常,然后继续发送,但是下游接收不到事件。

结果如下:

复制代码
1
2
3
4
5
6
7
zlc.season.rxjava2demo D/TAG: onSubscribe zlc.season.rxjava2demo D/TAG: emit 1 zlc.season.rxjava2demo W/TAG: onError: io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests ...... zlc.season.rxjava2demo D/TAG: emit 2 zlc.season.rxjava2demo D/TAG: emit 3 zlc.season.rxjava2demo D/TAG: emit complete

在不同线程中,上游可以正常发送事件,但是下游依然接受不到;接收不到原因和上面相同,因为没有request,没有报出异常是因为异步订阅里每个管道存在着一个水缸,上游将事件发送到水缸里面存储了,Flowable中水缸默认的大小为128,这是在源码中buffersize变量定义的。当然我们可以手动request事件,每次request之后会从水缸中取出响应数量的事件。

不同线程结果如下:

复制代码
1
2
3
4
5
zlc.season.rxjava2demo D/TAG: onSubscribe zlc.season.rxjava2demo D/TAG: emit 1 zlc.season.rxjava2demo D/TAG: emit 2 zlc.season.rxjava2demo D/TAG: emit 3 zlc.season.rxjava2demo D/TAG: emit complete

BackpressureStrategy

背压策略主要有四种:

  • BackpressureStrategy.ERROR,前面已经用到过,会在上下游流量不平衡的时候报出MissingBackpressureException的错误。
  • BackpressureStrategy.BUFFER,这种背压策略和Observable没有什么区别,上游可以无限发送,水缸足够大,最后还是会抛出OOM;并且可以发现Flowable里无限发送的话,内存增长的比Observable慢,这是因为Flowable采用响应拉取,难免会损耗些性能。
  • BackpressureStrategy.DROP,水缸里只存储128个事件,剩余的事件舍去,如果下游拉取了事件,则上游当前正在发送的事件在拉取时刻补充进水缸。
  • BackpressureStrategy.LATEST,下游总能获取到最后最新的事件,因为水缸中最后进来的事件总会被新的事件overwrite,所以可以每次拉取总能获得最后或是最新的事件。

如果不是我们自定义的Flowable,无法使用常量标识符,RxJava给我们提供了如下方法:

  • onBackpressureDrop()
  • onBackpressureBuffer()
  • onBackpressureLatest()

用法如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Flowable.interval(1, TimeUnit.MICROSECONDS) .onBackpressureDrop() //加上背压策略 .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<Long>() { @Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); mSubscription = s; s.request(Long.MAX_VALUE); } @Override public void onNext(Long aLong) { Log.d(TAG, "onNext: " + aLong); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });

前面我们说Flowable采用响应拉取的方式,但是前面的例子都是下游request了,但是上游却一直在发送,如何让上游知道下游request的数量从而做到下游request多少个上游就emit多少个呢?

从FlowableEmitter的源码中我们可以看到有这样一个抽象方法:

复制代码
1
long requested();

通过FlowableEmitter的实例调用这个方法可以获得下游request的事件个数,并且多次调用可以叠加,而每当上游emit一个onNext()事件,requested()获得的数就自减1,注意发送error和complete事件不会消耗request值,一旦减到0以后,如果下游没有继续request,就会报错MissingBackpressureException。

同步订阅:

这里写图片描述

异步订阅:

这里写图片描述

同步订阅中下游request()方法可以同步修改上游requested()方法的返回值;
而异步订阅中,上下游线程中各有一个requested(),下游的request()方法只能改变下游主线程中的requested()的返回值,上游该方法的返回值由RxJava内部去调用request(n)来改变,人为不能强制调用,一开始的时候其内部调用的是request(128),这就解释了水缸的默认容量为什么是128了,那什么时候调用我们设置的request的值呢?

看以下两个运行结果:

复制代码
1
2
3
4
5
6
7
8
9
10
11
D/TAG: onSubscribe D/TAG: First requested = 128 D/TAG: emit 0 , requested = 127 D/TAG: emit 1 , requested = 126 D/TAG: emit 2 , requested = 125 ... D/TAG: emit 124 , requested = 3 D/TAG: emit 125 , requested = 2 D/TAG: emit 126 , requested = 1 D/TAG: emit 127 , requested = 0 D/TAG: Oh no! I can't emit value!

上面代表一开始request的128个事件,这时候上游会发送128个事件到水缸中,没有调用我们设置的值。而一旦我们手动request了96(注意这里96是源码中规定的,其他值都不行)个事件,当下游消耗掉了96个事件以后,上游会继续发送96个事件进入水缸,并且上游获得的requested()的返回值也是96,如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
D/TAG: onNext: 0 D/TAG: onNext: 1 ... D/TAG: onNext: 92 D/TAG: onNext: 93 D/TAG: onNext: 94 D/TAG: onNext: 95 D/TAG: emit 128 , requested = 95 D/TAG: emit 129 , requested = 94 D/TAG: emit 130 , requested = 93 D/TAG: emit 131 , requested = 92 ... D/TAG: emit 219 , requested = 4 D/TAG: emit 220 , requested = 3 D/TAG: emit 221 , requested = 2 D/TAG: emit 222 , requested = 1 D/TAG: emit 223 , requested = 0 D/TAG: Oh no! I can't emit value!

实践

从文本中一行一行读取,采用边读边处理的方式。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public static void main(String[] args) { practice1(); try { Thread.sleep(10000000); } catch (InterruptedException e) { e.printStackTrace(); } } public static void practice1() { Flowable.create(new FlowableOnSubscribe<String>() { @Override public void subscribe(FlowableEmitter<String> emitter) throws Exception { try { FileReader reader = new FileReader("test.txt"); BufferedReader br = new BufferedReader(reader); String str; while ((str = br.readLine()) != null && !emitter.isCancelled()) { while (emitter.requested() == 0) { if (emitter.isCancelled()) { break; } } emitter.onNext(str); } br.close(); reader.close(); emitter.onComplete(); } catch (Exception e) { emitter.onError(e); } } }, BackpressureStrategy.ERROR) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.newThread()) .subscribe(new Subscriber<String>() { @Override public void onSubscribe(Subscription s) { mSubscription = s; s.request(1); } @Override public void onNext(String string) { System.out.println(string); try { Thread.sleep(2000); mSubscription.request(1); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void onError(Throwable t) { System.out.println(t); } @Override public void onComplete() { } }); }

结果是:

这里写图片描述

最后

以上就是威武百褶裙最近收集整理的关于RxJava2.0学习笔记(Backpressure,Flowable)的全部内容,更多相关RxJava2内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(123)

评论列表共有 0 条评论

立即
投稿
返回
顶部