文章

合并操作符

合并操作符

zip 操作符

zip 操作符介绍

7st9y

zip 组合事件的过程就是分别从发射器 A 和发射器 B 各取出一个事件来组合,并且一个事件只能被使用一次,组合的顺序是严格按照事件发送的顺序来进行的;

最终接收器收到的事件数量是和发送器发送事件最少的那个发送器的发送事件数目相同。

zip 总结

  1. zip 默认在同一个线程,getStringObservable 发送完毕后,getIntegerObservable 再发送,每发送一个,zip 就组合一个,组合结果再发送给订阅者;
  2. 如何让 2 个 Observable 同时发射数据,订阅在其他线程同时开始发送, 每发送一个, zip 就组合一个, 再将组合结果发送给订阅者.
  3. 如果 zip 添加了线程调度器,那么 zip 的所有 Observable 都在该线程发射数据;如果每个待 zip 的 Observable 都有自己的调度器,那么各自运行在各自的调度器上;每次 zip 的最后一个 Observable(不是逻辑位置上的最后,而是时间上最后发射数据的 Observable) 会影响 zip 后的操作符调度线程,没有 observeOn,会调度在最后一个 Observable 线程上;如果需要后续操作符线程调度不受最后一个 Observable 线程影响,需要调用 observeOn 来切换线程。
  4. zip 需要每次等待所有 Observable 数据发送,最终来合并,快的 Observable 可能会全部发射完数据再和其他的 Observable 合并;combineLatest 只需要第一次所有 Observable 发射一次数据后,后续不需要所有的 Observable 发射数据才合并,每个 Observable 发射一条数据都合并,即第一次后不需要等待所有的 Observable 了。
  5. zip 的 source Observable 尽量不要写 onCompleted(),会导致短的先 dispose 了,导致出错;如果都只有 1 个数据,加上 onComplete 的没关系

案例:zip 测试

source1 和 source2 没有自己的调度器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private static void testZip() {
    long s1 = System.currentTimeMillis();
    Observable.zip(getStringObservable(), getIntegerObservable(), (s, i) -> {
                RxUtils.logi("\tzip apply", "s = " + s + ", i = " + i);
                return s + "-" + i;
            })
            .subscribeOn(MyScheduler.create("zip-subscribeOn"))
            .observeOn(MyScheduler.create("zip-observeOn"))
            .subscribe(s -> RxUtils.logi("\tsubscribe onNext", "onNext: " + s + " cost:" + (System.currentTimeMillis() - s1) + "ms  "),
                    throwable -> RxUtils.logw("\tsubscribe onError", "onError: " + throwable.getMessage()),
                    () -> RxUtils.logi("\tsubscribe onComplete", "onComplete" + " cost:" + (System.currentTimeMillis() - s1) + "ms  "));
}
private static Observable<String> getStringObservable() {
    return Observable.create((ObservableOnSubscribe<String>) e -> {
        if (!e.isDisposed()) {
            RxUtils.logi("getStringObservable", "emit String: A ");
            e.onNext("A");
        }
        if (!e.isDisposed()) {
            Thread.sleep(1000);
            RxUtils.logi("getStringObservable", "sleep 1000 emit String: B ");
            e.onNext("B");
        }
        if (!e.isDisposed()) {
            Thread.sleep(5000);
            RxUtils.logi("getStringObservable", "sleep 5000 emit String: C ");
            e.onNext("C");
        }
    });
}
private static Observable<Integer> getIntegerObservable() {
    return Observable.create((ObservableOnSubscribe<Integer>) e -> {
        if (!e.isDisposed()) {
            RxUtils.logi("getIntegerObservable", "emit Integer : 1 ");
            e.onNext(1);
        }
        if (!e.isDisposed()) {
            Thread.sleep(4000);
            RxUtils.logi("getIntegerObservable", "sleep 4000 emit Integer: 2");
            e.onNext(2);
        }
    });
}

结果:

【getStringObservable】emit String: A 【zip-subscribeOn-1 # 2024-01-12-04:25:16】 【getStringObservable】sleep 1000 emit String: B 【zip-subscribeOn-1 # 2024-01-12-04:25:17】 【getStringObservable】sleep 5000 emit String: C 【zip-subscribeOn-1 # 2024-01-12-04:25:22】 【getIntegerObservable】emit Integer : 1 【zip-subscribeOn-1 # 2024-01-12-04:25:22】 【 zip apply】s = A, i = 1【zip-subscribeOn-1 # 2024-01-12-04:25:22】 【 subscribe onNext】onNext: A-1 cost:6116ms 【zip-observeOn-2 # 2024-01-12-04:25:22】 【getIntegerObservable】sleep 4000 emit Integer: 2【zip-subscribeOn-1 # 2024-01-12-04:25:26】 【 zip apply】s = B, i = 2【zip-subscribeOn-1 # 2024-01-12-04:25:26】 【 subscribe onNext】onNext: B-2 cost:10122ms 【zip-observeOn-2 # 2024-01-12-04:25:26】

分析:

  • 由于 getStringObservable() 和 getStringObservable() 都是没有指定线程调度器,用的 zip 上的调度器;且是等 getStringObservable() 全部发送完毕后,再去执行 getIntegerObservable() 的,是串行的
  • getStringObservable() 发送完所有数据需要 6 秒,然后 getIntegerObservable() 发送 1 个数据,开始 zip 第 1 个数据,4 秒后 zip 第 2 个数据;总共耗时 10 秒,因为是串行的,所以是 2 个 source 的耗时之和

source1 和 source2 有自己的调度器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
private static void testZip() {
    long s1 = System.currentTimeMillis();
    Observable.zip(getStringObservable(), getIntegerObservable(), (s, i) -> {
                RxUtils.logi("\tzip apply", "s = " + s + ", i = " + i);
                return s + "-" + i;
            })
            .map(s -> {
                RxUtils.logi("\tmap apply", "s = " + s);
                return s;
            })
            .observeOn(MyScheduler.create("zip-observeOn-0"))
            .subscribeOn(MyScheduler.create("zip-subscribeOn"))
            .observeOn(MyScheduler.create("zip-observeOn"))
            .subscribe(s -> RxUtils.logi("\tsubscribe onNext", "onNext: " + s + " cost:" + (System.currentTimeMillis() - s1) + "ms  "),
                    throwable -> RxUtils.logw("\tsubscribe onError", "onError: " + throwable.getMessage()),
                    () -> RxUtils.logi("\tsubscribe onComplete", "onComplete" + " cost:" + (System.currentTimeMillis() - s1) + "ms  "));
}
private static Observable<String> getStringObservable() {
    return Observable.create((ObservableOnSubscribe<String>) e -> {
                if (!e.isDisposed()) {
                    RxUtils.logi("getStringObservable", "emit String: A ");
                    e.onNext("A");
                }
                if (!e.isDisposed()) {
                    Thread.sleep(1000);
                    RxUtils.logi("getStringObservable", "sleep 1000 emit String: B ");
                    e.onNext("B");
                }
                if (!e.isDisposed()) {
                    Thread.sleep(5000);
                    RxUtils.logi("getStringObservable", "sleep 5000 emit String: C ");
                    e.onNext("C");
                }
            })
            .subscribeOn(MyScheduler.create("string-subscribeOn"));
}
private static Observable<Integer> getIntegerObservable() {
    return Observable.create((ObservableOnSubscribe<Integer>) e -> {
                if (!e.isDisposed()) {
                    RxUtils.logi("getIntegerObservable", "emit Integer : 1 ");
                    e.onNext(1);
                }
                if (!e.isDisposed()) {
                    Thread.sleep(4000);
                    RxUtils.logi("getIntegerObservable", "sleep 4000 emit Integer: 2");
                    e.onNext(2);
                }
            })
            .subscribeOn(MyScheduler.create("int-subscribeOn"));
}

结果:

【getStringObservable】emit String: A 【string-subscribeOn-2 # 2024-01-12-05:12:15】 【getIntegerObservable】emit Integer : 1 【int-subscribeOn-3 # 2024-01-12-05:12:15】 【 zip apply】s = A, i = 1【string-subscribeOn-2 # 2024-01-12-05:12:15】 【 map apply】s = A-1【string-subscribeOn-2 # 2024-01-12-05:12:15】 【 subscribe onNext】onNext: A-1 cost:105ms 【zip-observeOn-5 # 2024-01-12-05:12:15】 【getStringObservable】sleep 1000 emit String: B 【string-subscribeOn-2 # 2024-01-12-05:12:16】 【getIntegerObservable】sleep 4000 emit Integer: 2【int-subscribeOn-3 # 2024-01-12-05:12:19】 【 zip apply】s = B, i = 2【int-subscribeOn-3 # 2024-01-12-05:12:19】 【 map apply】s = B-2【int-subscribeOn-3 # 2024-01-12-05:12:19】 【 subscribe onNext】onNext: B-2 cost:4106ms 【zip-observeOn-5 # 2024-01-12-05:12:19】 【getStringObservable】sleep 5000 emit String: C 【string-subscribeOn-2 # 2024-01-12-05:12:21】

分析:

  • 由于 getStringObservable() 和 getStringObservable() 指定了自己的线程调度器;他们是并行的
  • zip 操作符的 zipper 是在最后完成的线程中执行,getStringObservable 最后完成就是其所在的线程,否则在 getIntegerObservable() 所在的线程执行

案例 2:多个请求同时请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
Observable.zip(getGoogleInstallReferer(), getFacebookDDL(), getGoogleDDL(), (installReferer, facebookDDL, googleDDL) -> {
        RxUtils.logi("------>>>>zip onNext", "installReferer=" + installReferer + ",facebookDDL=" + facebookDDL + ",googleDDL:" + googleDDL);
        if (installReferer != null) { // Google Install Referer优先级最高
            return installReferer;
        }
        if (facebookDDL != null) {
            return facebookDDL;
        }
        if (googleDDL != null) {
            return googleDDL;
        }
        return "";
    })
    .map(new Function<Object, String>() {
        @Override
        public String apply(Object o) throws Exception {
            return o.toString();
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.single())
    .subscribe(o -> {
        RxUtils.logi("-------->>>>subscribe onNext", "String: " + o);
    }, throwable -> {
        RxUtils.logi("-------->>>>subscribe onError", throwable.getMessage());
    }, () -> {
        RxUtils.logi("-------->>>>subscribe onComplete", "onComplete");
    });

public static Observable<String> getGoogleInstallReferer() {
    return Observable.<String>create(emitter -> {
                RxUtils.logi("-->>getGoogleInstallReferer", " create");
                requestReferer(new Callback() {
                    @Override
                    public void onResult(String referer) {
                        emitter.onNext(referer);
                    }
                });
//                    String s = null;
//                    s.toString();
//                    if (!emitter.isDisposed()) {
//                        emitter.onNext("hacket");
//                        emitter.onComplete();
//                    }
            })
            .onErrorReturnItem("npe")
            .timeout(3, TimeUnit.SECONDS)
            .subscribeOn(Schedulers.io());
//                .subscribeOn(Schedulers.newThread());
}

// 在rxjava create中,如果切换了线程,那么就会在切换的线程中执行,如果没有切换线程,那么就会在subscribeOn中的线程中执行
private static void requestReferer(Callback callback) {
    new Thread() {
        @Override
        public void run() {
            super.run();
            RxUtils.logi("---->>requestReferer", " start, sleep 2s");
            SleepTools.second(2);
            RxUtils.logi("---->>requestReferer", " end, to main");
            Schedulers.single().scheduleDirect(new Runnable() {
                @Override
                public void run() {
                    callback.onResult("google install referer.");
                }
            });
        }
    }.start();
}
public interface Callback {
    void onResult(String referer);
}

public static Observable<Integer> getFacebookDDL() {
    return Observable.<Integer>create(emitter -> {
                RxUtils.logi("-->>getFacebookDDL", " create sleep 2000");
                SleepTools.second(1);
                RxUtils.logi("---->>getFacebookDDL", " create sleep end, send 18");
                int i = 0;
                int j = 10 / i;
                if (!emitter.isDisposed()) {
                    emitter.onNext(18);
                    emitter.onComplete();
                }
            })
            .timeout(3, TimeUnit.SECONDS)
            .subscribeOn(Schedulers.io())
            .onErrorReturnItem(-1);
//                .subscribeOn(Schedulers.computation());
}

public static Observable<Test.User> getGoogleDDL() {
    return Observable.<Test.User>create(emitter -> {
                RxUtils.logi("-->>getGoogleDDL", " create sleep 6000");


                Test.User user = new Test.User();
                user.name = "大圣";
                user.age = 32;
                SleepTools.second(1);
                RxUtils.logi("---->>getGoogleDDL", " create sleep end, send user.");
//                    int i = 0;
//                    int j = 10/i;
                if (!emitter.isDisposed()) {
                    emitter.onNext(user);
                    emitter.onComplete();
                }
            })
            .timeout(3, TimeUnit.SECONDS)
            .onErrorReturnItem(Test.User.error())
            /*.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Test.User>>() {
        @Override
        public ObservableSource<? extends Test.User> apply(Throwable throwable) throws Exception {
            return Observable.just(new Test.User());
        }
    })*/
            .subscribeOn(Schedulers.io());
//                .subscribeOn(Schedulers.trampoline());
}

mergeXXX

merge

3kly8
merge 的作用是把多个 Observable 结合起来,接受可变参数,也支持迭代器集合。注意它和 concat 的区别在于,不用等到 发射器 A 发送完所有的事件再进行发射器 B 的发送。

  • merge 是将多个合并成一个,zip 是多个同时请求,有多个结果

案例 1- 顺序:

1
2
3
4
5
6
7
8
9
10
11
private void mergeClick() {
    Observable.merge(Observable.just(1, 2), Observable.just(3, 4, 5))
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(@NonNull Integer integer) throws Exception {
                    sb.append("merge :" + integer + "\n");
                     mTvOperatorResult1.setText(sb.toString());
                    Log.i(TAG, "accept: merge :" + integer + "\n");
                }
            });
}

结果:

1
2
3
4
5
1
2
3
4
5
  • 2、案例 2- 交错
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private void mergeClick1() {
    Observable.merge(observabl1(), observabl2())
            .compose(RxUtils.applyObservableSchedulers())
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(@NonNull Integer integer) throws Exception {
                    sb.append("merge :" + integer + "\n");
                    mTvOperatorResult1.setText(sb.toString());
                    Log.w(TAG, sb.toString());
                }
            });
}

private Observable<Integer> observabl1() {
    return Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            SystemClock.sleep(1000);
            emitter.onNext(3);
            SystemClock.sleep(1000);
            emitter.onNext(5);
        }
    }).subscribeOn(Schedulers.io());
}

private Observable<Integer> observabl2() {
    return Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            SystemClock.sleep(500);
            emitter.onNext(2);
            SystemClock.sleep(1000);
            emitter.onNext(4);
            SystemClock.sleep(1000);
            emitter.onNext(6);
        }
    }).subscribeOn(Schedulers.io());
}

结果:

1
2
3
4
5
6
merge :1
merge :2
merge :3
merge :4
merge :5
merge :6

mergeWith

rucqi
同 merge,调用的是 merge2 个参数的方法

1
2
3
4
public final Observable<T> mergeWith(ObservableSource<? extends T> other) {
    ObjectHelper.requireNonNull(other, "other is null");
    return merge(this, other);
}

merge(…)

mergeArray()

combineLatest

anzzb
combineLatest 是 RxJava 本身提供的一个常用的操作符,它接受两个或以上的 Observable 和一个 FuncX 闭包。当传入的 Observable 中任意的一个发射数据时,combineLatest 将每个 Observable 的最近值 (Lastest) 联合起来(combine)传给 FuncX 闭包进行处理。要点在于

  1. combineLatest 是会存储每个 Observable 的最近的值的
  2. 任意一个 Observable 发射新值时都会触发操作 ->”combine all the Observable’s lastest value together and send to Function”

可用于表单校验

注意
combineLatest 任意一个 Observable 发射数据之后,会去取其它 Observable 最近一次发射的数据,回调到函数当中,但是该函数回调的前提是所有的 Observable 都至少发射过一个数据项,以后就不用每次都要发射数据了
和 zip 区别:
zip 是当原始 Observable 中每一个都发射了一条数据时才发射数据。

startWith

给你被观察者的数据流前再增加一点同类型的数据,这里增加的是 1,2

1
2
3
4
5
6
7
8
9
10
11
12
13
Observable.just(3, 4, 5).startWith(1, 2)
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.i("xbh", integer + "");
            }
        });
// 结果
1
2
3
4
5
本文由作者按照 CC BY 4.0 进行授权