连接操作符
concatXXX
concat 操作符
concat 它会连接多个 Observable,并且必须要等到前一个 Observable 的所有数据项都发送完之后,才会开始下一个 Observable 数据的发送
将多个 Observable 合并成一个,zip 是多个 Observable 合并。
多个 Observable,依次合并各个 Observable。当中有 Observable onComplete() 后,直接跳过当前这个;如果当前 Observable 有 oError,那么会抛到 concat 的 onError。
案例:
1
2
3
4
5
6
7
8
9
10
Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6, 8, 0))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.i(TAG, "concat : " + integer + "\n");
sb.append("concat : " + integer + "\n");
mTvOperatorResult1.setText(sb.toString());
}
});
结果:
1
2
3
4
5
6
7
8
concat : 1
concat : 2
concat : 3
concat : 4
concat : 5
concat : 6
concat : 8
concat : 0
concatEager
它和 concat 最大的不同就是多个 Observable 可以同时开始发射数据,如果后一个 Observable 发射完成后,前一个 Observable 还有发射完数据,那么它会将后一个 Observable 的数据先缓存起来,等到前一个 Observable 发射完毕后,才将缓存的数据发射出去。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void button1Click() {
Observable<List<CacheDemoUtils.User>> just1 = CacheDemoUtils.getMemory().subscribeOn(Schedulers.io()); // 指定线程
Observable<List<CacheDemoUtils.User>> just2 = CacheDemoUtils.getDisk().subscribeOn(Schedulers.io());
Observable<List<CacheDemoUtils.User>> just3 = CacheDemoUtils.getNetwork().subscribeOn(Schedulers.io());
List<Observable<List<CacheDemoUtils.User>>> justs = new ArrayList<>();
justs.add(just1);
justs.add(just2);
justs.add(just3);
Observable.concatEager(justs)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<CacheDemoUtils.User>>() {
@Override
public void accept(List<CacheDemoUtils.User> users) throws Exception {
sb.append("concatEager : " + users.size() + ",from:" + users.get(0).from + "," + System.currentTimeMillis() + "\n");
LogUtil.logi(TAG, "concatEager", users.size() + ",from:" + users.get(0).from + "," + System.currentTimeMillis() + "\n");
mTvOperatorResult1.setText(sb.toString());
}
});
}
结果:
1
2
3
4
5
6
【getNetwork】开始从网络获取数据,需要耗时:8000,线程:RxCachedThreadScheduler-4,日期:2018-11-09 15:05:29
【getMemory】开始从内存获取数据,需要耗时:10000,线程:RxCachedThreadScheduler-2,日期:2018-11-09 15:05:29
【getDisk】开始从磁盘获取数据,需要耗时:5000,线程:RxCachedThreadScheduler-3,日期:2018-11-09 15:05:29
【concatEager】20,from:memory,1541747139752,线程:main,日期:2018-11-09 15:05:39
【concatEager】15,from:disk,1541747139756,线程:main,日期:2018-11-09 15:05:39
【concatEager】30,from:network,1541747139758,线程:main,日期:2018-11-09 15:05:39
注意: 要想实现这种效果,需要将 concatEager 各个 Observable 订阅在子线程中,而不要使用 concatEager 的线程,否则他们还是顺序在同一个线程执行,达不到同时执行的效果。
concatEager
它和 concat 最大的不同就是多个 Observable 可以同时开始发射数据,如果后一个 Observable 发射完成后,前一个 Observable 还有发射完数据,那么它会将后一个 Observable 的数据先缓存起来,等到前一个 Observable 发射完毕后,才将缓存的数据发射出去。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void button1Click() {
Observable<List<CacheDemoUtils.User>> just1 = CacheDemoUtils.getMemory().subscribeOn(Schedulers.io()); // 指定线程
Observable<List<CacheDemoUtils.User>> just2 = CacheDemoUtils.getDisk().subscribeOn(Schedulers.io());
Observable<List<CacheDemoUtils.User>> just3 = CacheDemoUtils.getNetwork().subscribeOn(Schedulers.io());
List<Observable<List<CacheDemoUtils.User>>> justs = new ArrayList<>();
justs.add(just1);
justs.add(just2);
justs.add(just3);
Observable.concatEager(justs)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<CacheDemoUtils.User>>() {
@Override
public void accept(List<CacheDemoUtils.User> users) throws Exception {
sb.append("concatEager : " + users.size() + ",from:" + users.get(0).from + "," + System.currentTimeMillis() + "\n");
LogUtil.logi(TAG, "concatEager", users.size() + ",from:" + users.get(0).from + "," + System.currentTimeMillis() + "\n");
mTvOperatorResult1.setText(sb.toString());
}
});
}
结果:
1
2
3
4
5
6
【getNetwork】开始从网络获取数据,需要耗时:8000,线程:RxCachedThreadScheduler-4,日期:2018-11-09 15:05:29
【getMemory】开始从内存获取数据,需要耗时:10000,线程:RxCachedThreadScheduler-2,日期:2018-11-09 15:05:29
【getDisk】开始从磁盘获取数据,需要耗时:5000,线程:RxCachedThreadScheduler-3,日期:2018-11-09 15:05:29
【concatEager】20,from:memory,1541747139752,线程:main,日期:2018-11-09 15:05:39
【concatEager】15,from:disk,1541747139756,线程:main,日期:2018-11-09 15:05:39
【concatEager】30,from:network,1541747139758,线程:main,日期:2018-11-09 15:05:39
注意: 要想实现这种效果,需要将 concatEager 各个 Observable 订阅在子线程中,而不要使用 concatEager 的线程,否则他们还是顺序在同一个线程执行,达不到同时执行的效果。
publish&replay&connect&share&refCount&autoConnect
从图中可以看出,这里面可以供使用者订阅的 Observable 可以分为四类,下面我们将逐一介绍这几种 Observable 的特点:
- 第一类:Cold Observable,就是我们通过 Observable.create、Observable.interval 等创建型操作符生成的 Observable。
- 第二类:由 Cold Observable 经过 publish() 或者 replay(int N) 操作符转换成的 ConnectableObservable。
- 第三类:由 ConnectableObservable 经过 refCount(),或者由 Cold Observable 经过 share() 转换成的 Observable。
- 第四类:由 ConnectableObservable 经过 autoConnect(int N) 转换成的 Observable。
一、Cold Observable & Hot Observable & ConnectableObservable
1、Cold Observable
过 Observable.create、Observable.interval 等创建型操作符生成的 Observable
- 当一个订阅者订阅 Cold Observable 时,Cold Observable 会重新开始发射数据给该订阅者。
- 当多个订阅者订阅到同一个 Cold Observable,它们收到的数据是相互独立的。
- 当一个订阅者取消订阅 Cold Observable 后,Cold Observable 会停止发射数据给该订阅者,但不会停止发射数据给其它订阅者。
2、ConnectableObservable
由
publish()
和replay(int N)
转换 Cold Observable 而来。
ConnectableObservable 的几个特点:
- 无论 ConnectableObservable 有没有订阅者,只要调用了 ConnectableObservable 的 connect 方法,Cold Observable 就开始发送数据,不调用 connect 不发射数据
- connect 会返回一个 Disposable 对象,调用了该对象的 dispose 方法,Cold Observable 将会停止发送数据,所有 ConnectableObservable 的订阅者也无法收到数据。
- 在调用 connect 返回的 Disposable 对象后,如果重新调用了 connect 方法,那么 Cold Observable 会重新发送数据。
- 当一个订阅者订阅到 ConnectableObservable 后,该订阅者会收到在订阅之后,Cold Observable 发送给 ConnectableObservable 的数据。
- 当多个订阅者订阅到同一个 ConnectableObservable 时,它们收到的数据是相同的。
- 当一个订阅者取消对 ConnectableObservable,不会影响其他订阅者收到消息。
3、由 ConnectableObservable 转换成 Observable
通过
.refCount()
或.autoConnect(int N)
生成
- refCount 生成的 Observable
- 第一个订阅者订阅到 refObservable 后,Cold Observable 开始发送数据。
- 之后的订阅者订阅到 refObservable 后,只能收到在订阅之后 Cold Observable 发送的数据。
- 如果一个订阅者取消订阅到 refObservable 后,假如它是当前 refObservable 的唯一一个订阅者,那么 Cold Observable 会停止发送数据;否则,Cold Observable 仍然会继续发送数据,其它的订阅者仍然可以收到 Cold Observable 发送的数据。
- autoConnect(int N) 转换成 Observable
- 当有 N 个订阅者订阅到 autoObservable 后,Cold Observable 开始发送数据,不足 N 个订阅者,不会发送数据。
- 之后的订阅者订阅到 autoObservable 后,只能收到在订阅之后 Cold Observable 发送的数据。
- 只要 Cold Observable 开始发送数据,即使所有的 autoObservable 的订阅和都取消了订阅,Cold Observable 也不会停止发送数据,如果想要 Cold Observable 停止发送数据,那么可以使用 autoConnect(int numberOfSubscribers, Consumer connection) 中 Consumer 返回的 Disposable,它的作用和 ConnectableObservable 的 connect 方法返回的 Disposable 相同。
二、小结
publish&replay&connect
通过 publish
和 replay
生成,然后调用 connect
开始发射数据
publish
使用.publish() 创建,那么订阅者只能收到在订阅之后 Cold Observable 发出的数据,订阅前的数据收不到。
生成 ConnectableObservable
replay(int N)
使用 reply(int N) 创建,那么订阅者在订阅后可以收到 Cold Observable 在订阅之前发送的 N 个数据和订阅后的数据
生成 ConnectableObservable。
connect
不管有没有订阅者,调用 connect 后,都会发射数据
三、refCount&autoConnect(int N)&share
refCount
将 ConnectableObservable 转换为 Observable,但还保持着 ConnectableObservable 特性,所有订阅者共享数据;所有订阅者取消订阅后,停止发射数据
autoConnect(int N)
将 ConnectableObservable 转换为 Observable,但还保持着 ConnectableObservable 特性,所有订阅者共享数据;需要 N 个订阅者订阅后才会发送数据;所有订阅者取消订阅后,继续发射数据
share
.share() 操作符是.publish().refcount(),调用 dispose 后,再次 connect 后重新订阅数据
Reference
- RxJava2 实战知识梳理 (12) - 实战讲解 publish & replay & share & refCount & autoConnect
https://www.jianshu.com/p/575ce5b98389