文章

连接操作符

连接操作符

concatXXX

concat 操作符

aflnb

concat 它会连接多个 Observable,并且必须要等到前一个 Observable 的所有数据项都发送完之后,才会开始下一个 Observable 数据的发送

将多个 Observable 合并成一个,zip 是多个 Observable 合并。

多个 Observable,依次合并各个 Observable。当中有 Observable onComplete() 后,直接跳过当前这个;如果当前 Observable 有 oError,那么会抛到 concat 的 onError。

案例:

1
2
3
4
5
6
7
8
9
10
Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6, 8, 0))
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                Log.i(TAG, "concat : " + integer + "\n");
                sb.append("concat : " + integer + "\n");
                mTvOperatorResult1.setText(sb.toString());
            }
        });

结果:

1
2
3
4
5
6
7
8
concat : 1
concat : 2
concat : 3
concat : 4
concat : 5
concat : 6
concat : 8
concat : 0

concatEager

它和 concat 最大的不同就是多个 Observable 可以同时开始发射数据,如果后一个 Observable 发射完成后,前一个 Observable 还有发射完数据,那么它会将后一个 Observable 的数据先缓存起来,等到前一个 Observable 发射完毕后,才将缓存的数据发射出去。

zc0zf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void button1Click() {
    Observable<List<CacheDemoUtils.User>> just1 = CacheDemoUtils.getMemory().subscribeOn(Schedulers.io()); // 指定线程
    Observable<List<CacheDemoUtils.User>> just2 = CacheDemoUtils.getDisk().subscribeOn(Schedulers.io());
    Observable<List<CacheDemoUtils.User>> just3 = CacheDemoUtils.getNetwork().subscribeOn(Schedulers.io());
    List<Observable<List<CacheDemoUtils.User>>> justs = new ArrayList<>();
    justs.add(just1);
    justs.add(just2);
    justs.add(just3);

    Observable.concatEager(justs)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<List<CacheDemoUtils.User>>() {
                @Override
                public void accept(List<CacheDemoUtils.User> users) throws Exception {
                    sb.append("concatEager : " + users.size() + ",from:" + users.get(0).from + "," + System.currentTimeMillis() + "\n");
                    LogUtil.logi(TAG, "concatEager", users.size() + ",from:" + users.get(0).from + "," + System.currentTimeMillis() + "\n");
                    mTvOperatorResult1.setText(sb.toString());
                }
            });
}

结果:

1
2
3
4
5
6
【getNetwork】开始从网络获取数据,需要耗时:8000,线程:RxCachedThreadScheduler-4,日期:2018-11-09 15:05:29
【getMemory】开始从内存获取数据,需要耗时:10000,线程:RxCachedThreadScheduler-2,日期:2018-11-09 15:05:29
【getDisk】开始从磁盘获取数据,需要耗时:5000,线程:RxCachedThreadScheduler-3,日期:2018-11-09 15:05:29
【concatEager】20,from:memory,1541747139752,线程:main,日期:2018-11-09 15:05:39
【concatEager】15,from:disk,1541747139756,线程:main,日期:2018-11-09 15:05:39
【concatEager】30,from:network,1541747139758,线程:main,日期:2018-11-09 15:05:39

注意: 要想实现这种效果,需要将 concatEager 各个 Observable 订阅在子线程中,而不要使用 concatEager 的线程,否则他们还是顺序在同一个线程执行,达不到同时执行的效果。

concatEager

它和 concat 最大的不同就是多个 Observable 可以同时开始发射数据,如果后一个 Observable 发射完成后,前一个 Observable 还有发射完数据,那么它会将后一个 Observable 的数据先缓存起来,等到前一个 Observable 发射完毕后,才将缓存的数据发射出去。

1y36x

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void button1Click() {
    Observable<List<CacheDemoUtils.User>> just1 = CacheDemoUtils.getMemory().subscribeOn(Schedulers.io()); // 指定线程
    Observable<List<CacheDemoUtils.User>> just2 = CacheDemoUtils.getDisk().subscribeOn(Schedulers.io());
    Observable<List<CacheDemoUtils.User>> just3 = CacheDemoUtils.getNetwork().subscribeOn(Schedulers.io());
    List<Observable<List<CacheDemoUtils.User>>> justs = new ArrayList<>();
    justs.add(just1);
    justs.add(just2);
    justs.add(just3);

    Observable.concatEager(justs)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<List<CacheDemoUtils.User>>() {
                @Override
                public void accept(List<CacheDemoUtils.User> users) throws Exception {
                    sb.append("concatEager : " + users.size() + ",from:" + users.get(0).from + "," + System.currentTimeMillis() + "\n");
                    LogUtil.logi(TAG, "concatEager", users.size() + ",from:" + users.get(0).from + "," + System.currentTimeMillis() + "\n");
                    mTvOperatorResult1.setText(sb.toString());
                }
            });
}

结果:

1
2
3
4
5
6
【getNetwork】开始从网络获取数据,需要耗时:8000,线程:RxCachedThreadScheduler-4,日期:2018-11-09 15:05:29
【getMemory】开始从内存获取数据,需要耗时:10000,线程:RxCachedThreadScheduler-2,日期:2018-11-09 15:05:29
【getDisk】开始从磁盘获取数据,需要耗时:5000,线程:RxCachedThreadScheduler-3,日期:2018-11-09 15:05:29
【concatEager】20,from:memory,1541747139752,线程:main,日期:2018-11-09 15:05:39
【concatEager】15,from:disk,1541747139756,线程:main,日期:2018-11-09 15:05:39
【concatEager】30,from:network,1541747139758,线程:main,日期:2018-11-09 15:05:39

注意: 要想实现这种效果,需要将 concatEager 各个 Observable 订阅在子线程中,而不要使用 concatEager 的线程,否则他们还是顺序在同一个线程执行,达不到同时执行的效果。

publish&replay&connect&share&refCount&autoConnect

c6bin

从图中可以看出,这里面可以供使用者订阅的 Observable 可以分为四类,下面我们将逐一介绍这几种 Observable 的特点:

  1. 第一类:Cold Observable,就是我们通过 Observable.create、Observable.interval 等创建型操作符生成的 Observable。
  2. 第二类:由 Cold Observable 经过 publish() 或者 replay(int N) 操作符转换成的 ConnectableObservable。
  3. 第三类:由 ConnectableObservable 经过 refCount(),或者由 Cold Observable 经过 share() 转换成的 Observable。
  4. 第四类:由 ConnectableObservable 经过 autoConnect(int N) 转换成的 Observable。

一、Cold Observable & Hot Observable & ConnectableObservable

1、Cold Observable

过 Observable.create、Observable.interval 等创建型操作符生成的 Observable

  1. 当一个订阅者订阅 Cold Observable 时,Cold Observable 会重新开始发射数据给该订阅者。
  2. 当多个订阅者订阅到同一个 Cold Observable,它们收到的数据是相互独立的。
  3. 当一个订阅者取消订阅 Cold Observable 后,Cold Observable 会停止发射数据给该订阅者,但不会停止发射数据给其它订阅者。

2、ConnectableObservable

publish()replay(int N) 转换 Cold Observable 而来。

ConnectableObservable 的几个特点:

  1. 无论 ConnectableObservable 有没有订阅者,只要调用了 ConnectableObservable 的 connect 方法,Cold Observable 就开始发送数据,不调用 connect 不发射数据
  2. connect 会返回一个 Disposable 对象,调用了该对象的 dispose 方法,Cold Observable 将会停止发送数据,所有 ConnectableObservable 的订阅者也无法收到数据。
  3. 在调用 connect 返回的 Disposable 对象后,如果重新调用了 connect 方法,那么 Cold Observable 会重新发送数据。
  4. 当一个订阅者订阅到 ConnectableObservable 后,该订阅者会收到在订阅之后,Cold Observable 发送给 ConnectableObservable 的数据。
  5. 当多个订阅者订阅到同一个 ConnectableObservable 时,它们收到的数据是相同的。
  6. 当一个订阅者取消对 ConnectableObservable,不会影响其他订阅者收到消息。

3、由 ConnectableObservable 转换成 Observable

通过 .refCount().autoConnect(int N) 生成

  • refCount 生成的 Observable
  1. 第一个订阅者订阅到 refObservable 后,Cold Observable 开始发送数据。
  2. 之后的订阅者订阅到 refObservable 后,只能收到在订阅之后 Cold Observable 发送的数据。
  3. 如果一个订阅者取消订阅到 refObservable 后,假如它是当前 refObservable 的唯一一个订阅者,那么 Cold Observable 会停止发送数据;否则,Cold Observable 仍然会继续发送数据,其它的订阅者仍然可以收到 Cold Observable 发送的数据。
  • autoConnect(int N) 转换成 Observable
  1. 当有 N 个订阅者订阅到 autoObservable 后,Cold Observable 开始发送数据,不足 N 个订阅者,不会发送数据。
  2. 之后的订阅者订阅到 autoObservable 后,只能收到在订阅之后 Cold Observable 发送的数据。
  3. 只要 Cold Observable 开始发送数据,即使所有的 autoObservable 的订阅和都取消了订阅,Cold Observable 也不会停止发送数据,如果想要 Cold Observable 停止发送数据,那么可以使用 autoConnect(int numberOfSubscribers, Consumer connection) 中 Consumer 返回的 Disposable,它的作用和 ConnectableObservable 的 connect 方法返回的 Disposable 相同。

二、小结

publish&replay&connect
通过 publishreplay 生成,然后调用 connect 开始发射数据

publish

使用.publish() 创建,那么订阅者只能收到在订阅之后 Cold Observable 发出的数据,订阅前的数据收不到。

生成 ConnectableObservable

replay(int N)

使用 reply(int N) 创建,那么订阅者在订阅后可以收到 Cold Observable 在订阅之前发送的 N 个数据和订阅后的数据

生成 ConnectableObservable。

connect

不管有没有订阅者,调用 connect 后,都会发射数据

三、refCount&autoConnect(int N)&share

refCount

将 ConnectableObservable 转换为 Observable,但还保持着 ConnectableObservable 特性,所有订阅者共享数据;所有订阅者取消订阅后,停止发射数据

autoConnect(int N)

将 ConnectableObservable 转换为 Observable,但还保持着 ConnectableObservable 特性,所有订阅者共享数据;需要 N 个订阅者订阅后才会发送数据;所有订阅者取消订阅后,继续发射数据

share

.share() 操作符是.publish().refcount(),调用 dispose 后,再次 connect 后重新订阅数据

Reference

本文由作者按照 CC BY 4.0 进行授权