文章

异常处理操作符

异常处理操作符

Error handling 相关的操作符来集中统一地处理错误。RxJava 中错误处理的操作符为 CatchRetry

Retry

retry 的意思就是试着重来,当原始 Observable 发射 onError 通知时,retry 操作符不会让 onError 通知传递给观察者,它会重新订阅这个 Observable 一次或者多次 (意味着重新从头发射数据),所以可能造成数据项重复发送的情况。
如果重新订阅了指定的次数还是发射了 onError 通知,将不再尝试重新订阅,它会把最新的一个 onError 通知传递给观察者。
RxJava 中将 Retry 操作符的实现为 retryretryWhen 两种。

retry 操作符

2ayxq

  • retry 解释

retry 的意思就是试着重来,当原始 Observable 发射 onError 通知时,retry 操作符不会让 onError 通知传递给观察者,它会重新订阅这个 Observable 一次或者多次 (意味着重新从头发射数据),所以可能造成数据项重复发送的情况。
如果重新订阅了指定的次数还是发射了 onError 通知,将不再尝试重新订阅,它会把最新的一个 onError 通知传递给观察者。
retry 操作符默认在 trampoline 调度器上执行。

  • API
    • retry()
      无论收到多少次 onError 通知,都会继续订阅并重发原始 Observable,直到 onCompleted。
    • retry(long)
      接受 count 参数的 retry 会最多重新订阅 count 次,如果次数超过了就不会尝试再次订阅,它会把最新的一个 onError 通知传递给他的观察者。
    • retry(BiPredicate<Integer, Throwable>)
      这个版本的 retry 接受一个谓词函数作为参数,这个函数的两个参数是:重试次数和导致发射 onError 通知的 Throwable。这个函数返回一个布尔值,如果返回 true,retry 应该再次订阅和镜像原始的 Observable,如果返回 false,retry 会将最新的一个 onError 通知传递给它的观察者。
  • retry()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
private void retryClick2() {
    Observable
        .create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 3; i++) {
                    if (i == 1) {
                        Log.v(TAG, "①retry()->onError");
                        emitter.onError(new RuntimeException("always fails"));
                    } else {
                        emitter.onNext(i);
                    }
                }
            }

        })
        .compose(bindToLifecycle())
        .retry(new BiPredicate<Integer, Throwable>() {
            @Override
            public boolean test(Integer integer, Throwable throwable) throws Exception {
                Log.v(TAG, "③发生错误了:" + throwable.getMessage() + ",第" + integer + "次重新订阅");
                if (integer > 2) {
                    return false;//不再重新订阅
                }
                //此处也可以通过判断throwable来控制不同的错误不同处理
                return true;
            }
        }) // 无限次尝试重新订阅
        .subscribe(new Observer<Integer>() {
            @Override
            public void onError(Throwable e) {
                String stackTraceString = Log.getStackTraceString(e);
                Log.e(TAG, "retry() -> onError:" + stackTraceString);
            }

            @Override
            public void onComplete() {
                Log.v(TAG, "retry() -> onCompleted");
            }

            @Override
            public void onSubscribe(Disposable d) {
                Log.v(TAG, "retry() -> onSubscribe:" + d.isDisposed());
            }

            @Override
            public void onNext(Integer integer) {
                Log.v(TAG, "retry() -> onNext:" + integer);
            }
        });
}

结果:

1
2
3
4
5
6
7
8
9
10
11
07-18 19:25:51.438 4922-4922/me.hacket.assistant V/RxJava2: retry() -> onSubscribe:false
07-18 19:25:51.446 4922-4922/me.hacket.assistant V/RxJava2: retry() -> onNext:0
    ①retry()->onError
07-18 19:25:51.447 4922-4922/me.hacket.assistant V/RxJava2: ③发生错误了:always fails,第1次重新订阅
    retry() -> onNext:0
    ①retry()->onError
    ③发生错误了:always fails,第2次重新订阅
07-18 19:25:51.448 4922-4922/me.hacket.assistant V/RxJava2: retry() -> onNext:0
    ①retry()->onError
    ③发生错误了:always fails,第3次重新订阅
07-18 19:25:51.449 4922-4922/me.hacket.assistant E/RxJava2: retry() -> onError:java.lang.RuntimeException: always fails

retryWhen 操作符

51ag7

  1. 上游通知 retryWhen 本次订阅流已经完成,询问其是否需要重订阅,该询问是以 onError 事件触发的
  2. 外层 retryWhen 时重头再来重试,flatMap、concatMap、switchMap 内部 retryWhen 时只有其内部创建的那个 Observable 重试
  3. Function 的输入是一个 Observable,我们可以结合 flatMap 根据上游发送的错误类型进行相应的处理
  4. 返回一个 ObservableSource,如果该 ObservableSource 返回 onComplete 或 onError,那么不会触发重订阅;如果发送 onNext,那么会触发重订阅,也就是说,它仅仅是作为一个是否要触发重订阅的通知,onNext 发送的是什么数据并不重要

retryWhen 的 Observable 发送了 onError 或 onComplete 时重试结束;发送了其他数据继续重试

retryWhen 和 retry 类似,区别是,retryWhen 将 onError 中的 Throwable 传递给一个函数,这个函数产生另一个 Observable,retryWhen 观察它的结果再决定是不是要重新订阅原始的 Observable。如果这个 Observable 发射了一项数据,它就重新订阅,如果这个 Observable 发射的是 onError 通知,它就将这个通知传递给观察者然后终止。

1
2
3
4
5
6
7
8
9
Observable.create((Subscriber<? super String> s) -> {
    System.out.println("subscribing");
    s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> {
    return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
        System.out.println("delay retry by " + i + " second(s)");
        return Observable.timer(i, TimeUnit.SECONDS);
    });
}).toBlocking().forEach(System.out::println);

Catch 操作符

Catch 操作符能够拦截原始 Observable 的 onError 通知,不让 Observable 因为产生错误而终止。相当于 Java 中 try/catch 操作,不能因为抛异常而导致程序崩溃。
RxJava 将 Catch 实现为三个不同的操作符:

  1. onErrorReturn/onErrorReturnItem:让 Observable 遇到错误时发射一个特殊的项并且正常终止
  2. onErrorResumeNext:让 Observable 在遇到 Throwable 时开始发射第二个 Observable 的数据序列
  3. onExceptionResumeNext:让 Observable 在遇到 Exception 时开始发射第二个 Observable 的数据序列

onErrorReturnXXX

onErrorReturnItem

kc6fh

  • 方法原型
    • public final Observable<T> onErrorReturnItem(final T item)
  • 源码
1
2
3
4
public final Observable<T> onErrorReturnItem(final T item) {
    ObjectHelper.requireNonNull(item, "item is null");
    return onErrorReturn(Functions.justFunction(item));
}
  • 解释

出现异常时,发送一个 item,而不是 error

  • 案例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static Observable<Test.User> getGoogleDDL() {
return Observable.<Test.User>create(emitter -> {
            RxUtils.logi("-->>getGoogleDDL", " create sleep 6000");
            Test.User user = new Test.User();
            user.name = "大圣";
            user.age = 32;
            SleepTools.second(2);
            RxUtils.logw("---->>getGoogleDDL", " create sleep end, send user.");
            int i = 0;
            int j = 10 / i;
            if (!emitter.isDisposed()) {
                emitter.onNext(user);
                emitter.onComplete();
            }
        })
        .timeout(3, TimeUnit.SECONDS)
        .onErrorReturnItem(Test.User.error())
        .subscribeOn(Schedulers.io());
}

onErrorReturn

mh761

  • 解释
    1. onErrorReturn 方法返回一个原有 Observable 行为的新 Observable 镜像,后者会忽略前者的 onError 调用,不会将错误传递给观察者,作为替代,它会发发射一个特殊的项并调用观察者的 onCompleted 方法。
  • 方法原型
    • public final Observable<T> onErrorReturn(Function<? super Throwable, ? extends T> valueSupplier)
  • 案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
private void onErrorReturnClick() {

    /*
     * ①.onErrorReturn:
     * 返回一个原有Observable行为的新Observable镜像,
     * 后者会忽略前者的onError调用,不会将错误传递给观察者,
     * 作为替代,它会发发射一个特殊的项并调用观察者的onCompleted方法
     */

    Observable

            .create(new ObservableOnSubscribe<Integer>() {

                @Override

                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                    for (int i = 0; i < 10; i++) {

                        if (i > 3) {

                            //会忽略onError调用,不会将错误传递给观察者

// emitter.onError(new Throwable("i太大了")); // 会崩溃,需要包装成RuntimeException

                            emitter.onError(Exceptions.propagate(new Throwable("i太大了")));

                        }

                        emitter.onNext(i);

                    }

                    emitter.onComplete();

                }

            })

            .onErrorReturn(new Function<Throwable, Integer>() {
                @Override
                public Integer apply(Throwable throwable) throws Exception {
                    //作为替代,它会发发射一个特殊的项并调用观察者的onCompleted方法
                    return -1;
                }
            })

            .subscribe(new Observer<Integer>() {

                @Override

                public void onError(Throwable e) {

                    String stackTraceString = Log.getStackTraceString(e);

                    Log.e(TAG, "①onErrorReturn(Function)->onError:" + stackTraceString);

                }



                @Override

                public void onComplete() {

                    Log.v(TAG, "①onErrorReturn(Function)->onCompleted");

                }



                @Override

                public void onSubscribe(Disposable d) {

                    Log.v(TAG, "①onErrorReturn(Function)->onSubscribe:" + d.isDisposed());

                }



                @Override

                public void onNext(Integer integer) {

                    Log.v(TAG, "①onErrorReturn(Function)->onNext:" + integer);

                }

            });

}

结果:

1
2
3
4
5
6
7
07-18 18:11:29.888 29209-29209/me.hacket.assistant V/RxJava2: ①onErrorReturn(Function)->onSubscribe:false
①onErrorReturn(Function)->onNext:0
①onErrorReturn(Function)->onNext:1
①onErrorReturn(Function)->onNext:2
①onErrorReturn(Function)->onNext:3
①onErrorReturn(Function)->onNext:-1
①onErrorReturn(Function)->onCompleted

onErrorResumeNext 操作符

td1yn

  • 解释
    1. 让 Observable 在遇到错误时开始发射第二个 Observable 的数据序列;出现错误的序列不会走 onError
    2. 出现错误时会继续从 onErrorResumeNext 的 Observable 开始发送新的数据
  • 方法原型
    1. public final Observable<T> onErrorResumeNext(Function<? super Throwable, ? extends ObservableSource<? extends T>> resumeFunction)

和 onErrorResumeNext(Observable) 相似,但他能截取到原 Observable 的 onError 消息

  1. public final Observable<T> onErrorResumeNext(final ObservableSource<? extends T> next)
  • 案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
private void onErrorResumeNextClick() {
    /*
     * ②.onErrorResumeNext(Observable):
     * 当原Observable发射onError消息时,会忽略onError消息,不会传递给观察者;
     * 然后它会开始另一个备用的Observable,继续发射数据
     */
    Observable
        .create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                try {
                    for (int i = 0; i < 10; i++) {
                        if (i > 3) {
                            //会忽略onError调用,不会将错误传递给观察者
                            throw new Throwable("i太大了");
                        }
                        emitter.onNext(i);
                    }
                } catch (Throwable e) {
                    emitter.onError(Exceptions.propagate(e));
                }
                emitter.onComplete();
            }
        })
        .onErrorResumeNext(
            Observable.create(emitter -> {
                for (int i = 10; i < 13; i++) {
                    emitter.onNext(i);
                }
                emitter.onComplete();
            }))
        .subscribe(new Observer<Integer>() {
            @Override
            public void onError(Throwable e) {
                String stackTraceString = Log.getStackTraceString(e);
                Log.e(TAG, "①onErrorResumeNext(ObservableSource)->onError:" + stackTraceString);
            }
            @Override
            public void onComplete() {
                Log.v(TAG, "①onErrorResumeNext(ObservableSource)->onCompleted");
            }
            @Override
            public void onSubscribe(Disposable d) {
                Log.v(TAG, "①onErrorResumeNext(ObservableSource)->onSubscribe:" + d.isDisposed());
            }
            @Override
            public void onNext(Integer integer) {
                Log.v(TAG, "①onErrorResumeNext(ObservableSource)->onNext:" + integer);
            }
        });
}
// 2、onErrorResumeNext(Function)
private void onErrorResumeNextClick2() {
    /*
     * ③.onErrorResumeNext(Function):
     * 和onErrorResumeNext(Observable)相似,但他能截取到原Observable的onError消息
     */
    Observable
        .create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                try {
                    for (int i = 0; i < 10; i++) {
                        if (i > 3) {
                            //会忽略onError调用,不会将错误传递给观察者
                            throw new Throwable("i太大了");
                        }
                        emitter.onNext(i);
                    }
                } catch (Throwable throwable) {
                    emitter.onError(throwable);
                }
                emitter.onComplete();
            }
        })
        .onErrorResumeNext(new Function<Throwable, Observable<? extends Integer>>() {
            @Override
            public Observable<? extends Integer> apply(Throwable throwable) throws Exception {
                //throwable就是原Observable发射的onError消息中的Throwable对象
                Log.e(TAG, "③onErrorResumeNext(Func1)->throwable:" + throwable.getMessage());
                //如果原Observable发射了onError消息,将会开启下面的Observable
                return Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        for (int i = 100; i < 103; i++) {
                            emitter.onNext(i);
                        }
                        emitter.onComplete();
                    }
                });
            }
        })
        .subscribe(new Observer<Integer>() {
            @Override
            public void onError(Throwable e) {
                String stackTraceString = Log.getStackTraceString(e);
                Log.e(TAG, "①onErrorResumeNext(Function)->onError:" + stackTraceString);
            }
            @Override
            public void onComplete() {
                Log.v(TAG, "①onErrorResumeNext(Function)->onCompleted");
            }
            @Override
            public void onSubscribe(Disposable d) {
                Log.v(TAG, "①onErrorResumeNext(Function)->onSubscribe:" + d.isDisposed());
            }
            @Override
            public void onNext(Integer integer) {
                Log.v(TAG, "①onErrorResumeNext(Function)->onNext:" + integer);
            }
        });
}

结果:

1
2
3
4
5
6
7
8
9
①onErrorResumeNext(ObservableSource)->onSubscribe:false
①onErrorResumeNext(ObservableSource)->onNext:0
①onErrorResumeNext(ObservableSource)->onNext:1
①onErrorResumeNext(ObservableSource)->onNext:2
①onErrorResumeNext(ObservableSource)->onNext:3
①onErrorResumeNext(ObservableSource)->onNext:10
①onErrorResumeNext(ObservableSource)->onNext:11
①onErrorResumeNext(ObservableSource)->onNext:12
①onErrorResumeNext(ObservableSource)->onCompleted

onExceptionResumeNext 操作符

qhmlg

  • 解释
  1. 和 onErrorResumeNext 类似:让 Observable 在遇到错误时继续发射后面的数据项。 和 onErrorResumeNext 类似,onExceptionResumeNext 方法返回一个镜像原有 Observable 行为的新 Observable,也使用一个备用的 Observable,
  2. 如果 onError 收到的 Throwable 不是一个 Exception,它会将错误传递给观察者的 onError 方法,不会使用备用的 Observable;onErrorResumeNext 收到 Error 时不会走 onError 方法
  • 案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
private void onExceptionResumeNextClick() {

    /*

     * ④.onExceptionResumeNext:

     * 和onErrorResumeNext类似,可以说是onErrorResumeNext的特例,

     * 区别是如果onError收到的Throwable不是一个Exception,它会将错误传递给观察者的onError方法,不会使用备用的Observable。

     */

    Observable

            .create(new ObservableOnSubscribe<Integer>() {

                @Override

                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                    try {

                        for (int i = 0; i < 10; i++) {

                            if (i > 3) {

                                // 如果不是Exception,错误会传递给观察者,不会开启备用Observable

                                throw new Throwable("i太大了");

                                // 如果Exception,不会将错误传递给观察者,并会开启备用Observable

// throw new Exception("i太大了哦哦哦");

                            }

                            emitter.onNext(i);

                        }

                    } catch (Throwable e) {

                        emitter.onError(e);

// emitter.onError(Exceptions.propagate(e));

                    }

                    emitter.onComplete();

                }



            })

            .onExceptionResumeNext(Observable.create(new ObservableOnSubscribe<Integer>() {

                @Override

                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                    for (int i = 10; i < 13; i++) {

                        emitter.onNext(i);

                    }

                    emitter.onComplete();

                }



            }))

            .subscribe(new Observer<Integer>() {

                @Override

                public void onError(Throwable e) {

                    String stackTraceString = Log.getStackTraceString(e);

                    Log.e(TAG, "①onExceptionResumeNext(ObservableSource)->onError:" + stackTraceString);

                }



                @Override

                public void onComplete() {

                    Log.v(TAG, "①onExceptionResumeNext(ObservableSource)->onCompleted");

                }



                @Override

                public void onSubscribe(Disposable d) {

                    Log.v(TAG, "①onExceptionResumeNext(ObservableSource)->onSubscribe:" + d.isDisposed());

                }



                @Override

                public void onNext(Integer integer) {

                    Log.v(TAG, "①onExceptionResumeNext(ObservableSource)->onNext:" + integer);

                }

            });

}

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 1、异常是Exception

07-18 18:47:16.920 338-338/me.hacket.assistant V/RxJava2: ①onExceptionResumeNext(ObservableSource)->onSubscribe:false

    ①onExceptionResumeNext(ObservableSource)->onNext:0

07-18 18:47:16.921 338-338/me.hacket.assistant V/RxJava2: ①onExceptionResumeNext(ObservableSource)->onNext:1

    ①onExceptionResumeNext(ObservableSource)->onNext:2

    ①onExceptionResumeNext(ObservableSource)->onNext:3

07-18 18:47:16.923 338-338/me.hacket.assistant V/RxJava2: ①onExceptionResumeNext(ObservableSource)->onNext:10

    ①onExceptionResumeNext(ObservableSource)->onNext:11

    ①onExceptionResumeNext(ObservableSource)->onNext:12

    ①onExceptionResumeNext(ObservableSource)->onCompleted



// 2、异常是Throwable

07-18 18:50:40.731 338-338/me.hacket.assistant V/RxJava2: ①onExceptionResumeNext(ObservableSource)->onSubscribe:false

    ①onExceptionResumeNext(ObservableSource)->onNext:0

    ①onExceptionResumeNext(ObservableSource)->onNext:1

07-18 18:50:40.732 338-338/me.hacket.assistant V/RxJava2: ①onExceptionResumeNext(ObservableSource)->onNext:2

    ①onExceptionResumeNext(ObservableSource)->onNext:3

07-18 18:50:40.732 338-338/me.hacket.assistant E/RxJava2: ①onExceptionResumeNext(ObservableSource)->onError:java.lang.Throwable: i太大了

xxxDelayError()

在进行一些合并操作时,如果碰到某个 Observable 发送了 Error 事件,则操作就会终止. 这时候如果需要先暂时忽略错误,将相应的操作进行完后再将发送 Error 事件,测可以用该方法对应的 DelayError 版本的方法.

很多函数都有提供 DelayError 版本的方法, 比如:
combineLatestDelayError,concatDelayError, mergeDelayError, concatMapDelayError, switchMapDelayError, switchOnNextDelayError.

concatDelayError

Observable.concat 是将几个 Observable 的数据合并,如下所示,第一个 Observable 除了发射数据外,还会发射一个 Error,如果使用 concat, 则无法合并第二个 Observable 的内容.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Observable<String> obs1 = Observable.create(s -> {
    s.onNext("a1");
    s.onNext("a2");
    s.onNext("a3");
    s.onError(new Throwable("error from obs1"));
});

Observable<String> obs2 = Observable.just("b1", "b2", "b3");

Observable.concatDelayError(Arrays.asList(obs1, obs2)).subscribe(
        System.out::println
        , e -> System.out.println(e.getMessage())
        , () -> System.out.println("onCompleted")
);
本文由作者按照 CC BY 4.0 进行授权