异常处理操作符
Error handling 相关的操作符来集中统一地处理错误。RxJava 中错误处理的操作符为 Catch
和 Retry
Retry
retry 的意思就是试着重来,当原始 Observable 发射 onError 通知时,retry 操作符不会让 onError 通知传递给观察者,它会重新订阅这个 Observable 一次或者多次 (意味着重新从头发射数据),所以可能造成数据项重复发送的情况。
如果重新订阅了指定的次数还是发射了 onError 通知,将不再尝试重新订阅,它会把最新的一个 onError 通知传递给观察者。
RxJava 中将 Retry 操作符的实现为 retry
和 retryWhen
两种。
retry 操作符
- retry 解释
retry 的意思就是试着重来,当原始 Observable 发射 onError 通知时,retry 操作符不会让 onError 通知传递给观察者,它会重新订阅这个 Observable 一次或者多次 (意味着重新从头发射数据),所以可能造成数据项重复发送的情况。
如果重新订阅了指定的次数还是发射了 onError 通知,将不再尝试重新订阅,它会把最新的一个 onError 通知传递给观察者。
retry 操作符默认在 trampoline 调度器上执行。
- API
- retry()
无论收到多少次 onError 通知,都会继续订阅并重发原始 Observable,直到 onCompleted。 - retry(long)
接受 count 参数的 retry 会最多重新订阅 count 次,如果次数超过了就不会尝试再次订阅,它会把最新的一个 onError 通知传递给他的观察者。 - retry(BiPredicate<Integer, Throwable>)
这个版本的 retry 接受一个谓词函数作为参数,这个函数的两个参数是:重试次数和导致发射 onError 通知的 Throwable。这个函数返回一个布尔值,如果返回 true,retry 应该再次订阅和镜像原始的 Observable,如果返回 false,retry 会将最新的一个 onError 通知传递给它的观察者。
- retry()
- retry()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
private void retryClick2() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 3; i++) {
if (i == 1) {
Log.v(TAG, "①retry()->onError");
emitter.onError(new RuntimeException("always fails"));
} else {
emitter.onNext(i);
}
}
}
})
.compose(bindToLifecycle())
.retry(new BiPredicate<Integer, Throwable>() {
@Override
public boolean test(Integer integer, Throwable throwable) throws Exception {
Log.v(TAG, "③发生错误了:" + throwable.getMessage() + ",第" + integer + "次重新订阅");
if (integer > 2) {
return false;//不再重新订阅
}
//此处也可以通过判断throwable来控制不同的错误不同处理
return true;
}
}) // 无限次尝试重新订阅
.subscribe(new Observer<Integer>() {
@Override
public void onError(Throwable e) {
String stackTraceString = Log.getStackTraceString(e);
Log.e(TAG, "retry() -> onError:" + stackTraceString);
}
@Override
public void onComplete() {
Log.v(TAG, "retry() -> onCompleted");
}
@Override
public void onSubscribe(Disposable d) {
Log.v(TAG, "retry() -> onSubscribe:" + d.isDisposed());
}
@Override
public void onNext(Integer integer) {
Log.v(TAG, "retry() -> onNext:" + integer);
}
});
}
结果:
1
2
3
4
5
6
7
8
9
10
11
07-18 19:25:51.438 4922-4922/me.hacket.assistant V/RxJava2: retry() -> onSubscribe:false
07-18 19:25:51.446 4922-4922/me.hacket.assistant V/RxJava2: retry() -> onNext:0
①retry()->onError
07-18 19:25:51.447 4922-4922/me.hacket.assistant V/RxJava2: ③发生错误了:always fails,第1次重新订阅
retry() -> onNext:0
①retry()->onError
③发生错误了:always fails,第2次重新订阅
07-18 19:25:51.448 4922-4922/me.hacket.assistant V/RxJava2: retry() -> onNext:0
①retry()->onError
③发生错误了:always fails,第3次重新订阅
07-18 19:25:51.449 4922-4922/me.hacket.assistant E/RxJava2: retry() -> onError:java.lang.RuntimeException: always fails
retryWhen 操作符
- 上游通知 retryWhen 本次订阅流已经完成,询问其是否需要重订阅,该询问是以 onError 事件触发的
- 外层 retryWhen 时重头再来重试,flatMap、concatMap、switchMap 内部 retryWhen 时只有其内部创建的那个 Observable 重试
- Function 的输入是一个 Observable,我们可以结合 flatMap 根据上游发送的错误类型进行相应的处理
- 返回一个 ObservableSource,如果该 ObservableSource 返回 onComplete 或 onError,那么不会触发重订阅;如果发送 onNext,那么会触发重订阅,也就是说,它仅仅是作为一个是否要触发重订阅的通知,onNext 发送的是什么数据并不重要
retryWhen 的 Observable 发送了 onError 或 onComplete 时重试结束;发送了其他数据继续重试
retryWhen 和 retry 类似,区别是,retryWhen 将 onError 中的 Throwable 传递给一个函数,这个函数产生另一个 Observable,retryWhen 观察它的结果再决定是不是要重新订阅原始的 Observable。如果这个 Observable 发射了一项数据,它就重新订阅,如果这个 Observable 发射的是 onError 通知,它就将这个通知传递给观察者然后终止。
1
2
3
4
5
6
7
8
9
Observable.create((Subscriber<? super String> s) -> {
System.out.println("subscribing");
s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> {
return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
System.out.println("delay retry by " + i + " second(s)");
return Observable.timer(i, TimeUnit.SECONDS);
});
}).toBlocking().forEach(System.out::println);
Catch 操作符
Catch 操作符能够拦截原始 Observable 的 onError 通知,不让 Observable 因为产生错误而终止。相当于 Java 中 try/catch 操作,不能因为抛异常而导致程序崩溃。
RxJava 将 Catch 实现为三个不同的操作符:
- onErrorReturn/onErrorReturnItem:让 Observable 遇到错误时发射一个特殊的项并且正常终止
- onErrorResumeNext:让 Observable 在遇到 Throwable 时开始发射第二个 Observable 的数据序列
- onExceptionResumeNext:让 Observable 在遇到 Exception 时开始发射第二个 Observable 的数据序列
onErrorReturnXXX
onErrorReturnItem
- 方法原型
public final Observable<T> onErrorReturnItem(final T item)
- 源码
1
2
3
4
public final Observable<T> onErrorReturnItem(final T item) {
ObjectHelper.requireNonNull(item, "item is null");
return onErrorReturn(Functions.justFunction(item));
}
- 解释
出现异常时,发送一个 item,而不是 error
- 案例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static Observable<Test.User> getGoogleDDL() {
return Observable.<Test.User>create(emitter -> {
RxUtils.logi("-->>getGoogleDDL", " create sleep 6000");
Test.User user = new Test.User();
user.name = "大圣";
user.age = 32;
SleepTools.second(2);
RxUtils.logw("---->>getGoogleDDL", " create sleep end, send user.");
int i = 0;
int j = 10 / i;
if (!emitter.isDisposed()) {
emitter.onNext(user);
emitter.onComplete();
}
})
.timeout(3, TimeUnit.SECONDS)
.onErrorReturnItem(Test.User.error())
.subscribeOn(Schedulers.io());
}
onErrorReturn
- 解释
- onErrorReturn 方法返回一个原有 Observable 行为的新 Observable 镜像,后者会忽略前者的 onError 调用,不会将错误传递给观察者,作为替代,它会发发射一个特殊的项并调用观察者的 onCompleted 方法。
- 方法原型
public final Observable<T> onErrorReturn(Function<? super Throwable, ? extends T> valueSupplier)
- 案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
private void onErrorReturnClick() {
/*
* ①.onErrorReturn:
* 返回一个原有Observable行为的新Observable镜像,
* 后者会忽略前者的onError调用,不会将错误传递给观察者,
* 作为替代,它会发发射一个特殊的项并调用观察者的onCompleted方法
*/
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 10; i++) {
if (i > 3) {
//会忽略onError调用,不会将错误传递给观察者
// emitter.onError(new Throwable("i太大了")); // 会崩溃,需要包装成RuntimeException
emitter.onError(Exceptions.propagate(new Throwable("i太大了")));
}
emitter.onNext(i);
}
emitter.onComplete();
}
})
.onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
//作为替代,它会发发射一个特殊的项并调用观察者的onCompleted方法
return -1;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onError(Throwable e) {
String stackTraceString = Log.getStackTraceString(e);
Log.e(TAG, "①onErrorReturn(Function)->onError:" + stackTraceString);
}
@Override
public void onComplete() {
Log.v(TAG, "①onErrorReturn(Function)->onCompleted");
}
@Override
public void onSubscribe(Disposable d) {
Log.v(TAG, "①onErrorReturn(Function)->onSubscribe:" + d.isDisposed());
}
@Override
public void onNext(Integer integer) {
Log.v(TAG, "①onErrorReturn(Function)->onNext:" + integer);
}
});
}
结果:
1
2
3
4
5
6
7
07-18 18:11:29.888 29209-29209/me.hacket.assistant V/RxJava2: ①onErrorReturn(Function)->onSubscribe:false
①onErrorReturn(Function)->onNext:0
①onErrorReturn(Function)->onNext:1
①onErrorReturn(Function)->onNext:2
①onErrorReturn(Function)->onNext:3
①onErrorReturn(Function)->onNext:-1
①onErrorReturn(Function)->onCompleted
onErrorResumeNext 操作符
- 解释
- 让 Observable 在遇到错误时开始发射第二个 Observable 的数据序列;出现错误的序列不会走 onError
- 出现错误时会继续从 onErrorResumeNext 的 Observable 开始发送新的数据
- 方法原型
public final Observable<T> onErrorResumeNext(Function<? super Throwable, ? extends ObservableSource<? extends T>> resumeFunction)
和 onErrorResumeNext(Observable) 相似,但他能截取到原 Observable 的 onError 消息
public final Observable<T> onErrorResumeNext(final ObservableSource<? extends T> next)
- 案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
private void onErrorResumeNextClick() {
/*
* ②.onErrorResumeNext(Observable):
* 当原Observable发射onError消息时,会忽略onError消息,不会传递给观察者;
* 然后它会开始另一个备用的Observable,继续发射数据
*/
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
try {
for (int i = 0; i < 10; i++) {
if (i > 3) {
//会忽略onError调用,不会将错误传递给观察者
throw new Throwable("i太大了");
}
emitter.onNext(i);
}
} catch (Throwable e) {
emitter.onError(Exceptions.propagate(e));
}
emitter.onComplete();
}
})
.onErrorResumeNext(
Observable.create(emitter -> {
for (int i = 10; i < 13; i++) {
emitter.onNext(i);
}
emitter.onComplete();
}))
.subscribe(new Observer<Integer>() {
@Override
public void onError(Throwable e) {
String stackTraceString = Log.getStackTraceString(e);
Log.e(TAG, "①onErrorResumeNext(ObservableSource)->onError:" + stackTraceString);
}
@Override
public void onComplete() {
Log.v(TAG, "①onErrorResumeNext(ObservableSource)->onCompleted");
}
@Override
public void onSubscribe(Disposable d) {
Log.v(TAG, "①onErrorResumeNext(ObservableSource)->onSubscribe:" + d.isDisposed());
}
@Override
public void onNext(Integer integer) {
Log.v(TAG, "①onErrorResumeNext(ObservableSource)->onNext:" + integer);
}
});
}
// 2、onErrorResumeNext(Function)
private void onErrorResumeNextClick2() {
/*
* ③.onErrorResumeNext(Function):
* 和onErrorResumeNext(Observable)相似,但他能截取到原Observable的onError消息
*/
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
try {
for (int i = 0; i < 10; i++) {
if (i > 3) {
//会忽略onError调用,不会将错误传递给观察者
throw new Throwable("i太大了");
}
emitter.onNext(i);
}
} catch (Throwable throwable) {
emitter.onError(throwable);
}
emitter.onComplete();
}
})
.onErrorResumeNext(new Function<Throwable, Observable<? extends Integer>>() {
@Override
public Observable<? extends Integer> apply(Throwable throwable) throws Exception {
//throwable就是原Observable发射的onError消息中的Throwable对象
Log.e(TAG, "③onErrorResumeNext(Func1)->throwable:" + throwable.getMessage());
//如果原Observable发射了onError消息,将会开启下面的Observable
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 100; i < 103; i++) {
emitter.onNext(i);
}
emitter.onComplete();
}
});
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onError(Throwable e) {
String stackTraceString = Log.getStackTraceString(e);
Log.e(TAG, "①onErrorResumeNext(Function)->onError:" + stackTraceString);
}
@Override
public void onComplete() {
Log.v(TAG, "①onErrorResumeNext(Function)->onCompleted");
}
@Override
public void onSubscribe(Disposable d) {
Log.v(TAG, "①onErrorResumeNext(Function)->onSubscribe:" + d.isDisposed());
}
@Override
public void onNext(Integer integer) {
Log.v(TAG, "①onErrorResumeNext(Function)->onNext:" + integer);
}
});
}
结果:
1
2
3
4
5
6
7
8
9
①onErrorResumeNext(ObservableSource)->onSubscribe:false
①onErrorResumeNext(ObservableSource)->onNext:0
①onErrorResumeNext(ObservableSource)->onNext:1
①onErrorResumeNext(ObservableSource)->onNext:2
①onErrorResumeNext(ObservableSource)->onNext:3
①onErrorResumeNext(ObservableSource)->onNext:10
①onErrorResumeNext(ObservableSource)->onNext:11
①onErrorResumeNext(ObservableSource)->onNext:12
①onErrorResumeNext(ObservableSource)->onCompleted
onExceptionResumeNext 操作符
- 解释
- 和 onErrorResumeNext 类似:让 Observable 在遇到错误时继续发射后面的数据项。 和 onErrorResumeNext 类似,onExceptionResumeNext 方法返回一个镜像原有 Observable 行为的新 Observable,也使用一个备用的 Observable,
- 如果 onError 收到的 Throwable 不是一个 Exception,它会将错误传递给观察者的 onError 方法,不会使用备用的 Observable;onErrorResumeNext 收到 Error 时不会走 onError 方法
- 案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
private void onExceptionResumeNextClick() {
/*
* ④.onExceptionResumeNext:
* 和onErrorResumeNext类似,可以说是onErrorResumeNext的特例,
* 区别是如果onError收到的Throwable不是一个Exception,它会将错误传递给观察者的onError方法,不会使用备用的Observable。
*/
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
try {
for (int i = 0; i < 10; i++) {
if (i > 3) {
// 如果不是Exception,错误会传递给观察者,不会开启备用Observable
throw new Throwable("i太大了");
// 如果Exception,不会将错误传递给观察者,并会开启备用Observable
// throw new Exception("i太大了哦哦哦");
}
emitter.onNext(i);
}
} catch (Throwable e) {
emitter.onError(e);
// emitter.onError(Exceptions.propagate(e));
}
emitter.onComplete();
}
})
.onExceptionResumeNext(Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 10; i < 13; i++) {
emitter.onNext(i);
}
emitter.onComplete();
}
}))
.subscribe(new Observer<Integer>() {
@Override
public void onError(Throwable e) {
String stackTraceString = Log.getStackTraceString(e);
Log.e(TAG, "①onExceptionResumeNext(ObservableSource)->onError:" + stackTraceString);
}
@Override
public void onComplete() {
Log.v(TAG, "①onExceptionResumeNext(ObservableSource)->onCompleted");
}
@Override
public void onSubscribe(Disposable d) {
Log.v(TAG, "①onExceptionResumeNext(ObservableSource)->onSubscribe:" + d.isDisposed());
}
@Override
public void onNext(Integer integer) {
Log.v(TAG, "①onExceptionResumeNext(ObservableSource)->onNext:" + integer);
}
});
}
结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 1、异常是Exception
07-18 18:47:16.920 338-338/me.hacket.assistant V/RxJava2: ①onExceptionResumeNext(ObservableSource)->onSubscribe:false
①onExceptionResumeNext(ObservableSource)->onNext:0
07-18 18:47:16.921 338-338/me.hacket.assistant V/RxJava2: ①onExceptionResumeNext(ObservableSource)->onNext:1
①onExceptionResumeNext(ObservableSource)->onNext:2
①onExceptionResumeNext(ObservableSource)->onNext:3
07-18 18:47:16.923 338-338/me.hacket.assistant V/RxJava2: ①onExceptionResumeNext(ObservableSource)->onNext:10
①onExceptionResumeNext(ObservableSource)->onNext:11
①onExceptionResumeNext(ObservableSource)->onNext:12
①onExceptionResumeNext(ObservableSource)->onCompleted
// 2、异常是Throwable
07-18 18:50:40.731 338-338/me.hacket.assistant V/RxJava2: ①onExceptionResumeNext(ObservableSource)->onSubscribe:false
①onExceptionResumeNext(ObservableSource)->onNext:0
①onExceptionResumeNext(ObservableSource)->onNext:1
07-18 18:50:40.732 338-338/me.hacket.assistant V/RxJava2: ①onExceptionResumeNext(ObservableSource)->onNext:2
①onExceptionResumeNext(ObservableSource)->onNext:3
07-18 18:50:40.732 338-338/me.hacket.assistant E/RxJava2: ①onExceptionResumeNext(ObservableSource)->onError:java.lang.Throwable: i太大了
xxxDelayError()
在进行一些合并操作时,如果碰到某个 Observable 发送了 Error 事件,则操作就会终止. 这时候如果需要先暂时忽略错误,将相应的操作进行完后再将发送 Error 事件,测可以用该方法对应的 DelayError
版本的方法.
很多函数都有提供 DelayError 版本的方法, 比如:combineLatestDelayError
,concatDelayError
, mergeDelayError
, concatMapDelayError
, switchMapDelayError
, switchOnNextDelayError
.
concatDelayError
Observable.concat 是将几个 Observable 的数据合并,如下所示,第一个 Observable 除了发射数据外,还会发射一个 Error,如果使用 concat, 则无法合并第二个 Observable 的内容.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
Observable<String> obs1 = Observable.create(s -> {
s.onNext("a1");
s.onNext("a2");
s.onNext("a3");
s.onError(new Throwable("error from obs1"));
});
Observable<String> obs2 = Observable.just("b1", "b2", "b3");
Observable.concatDelayError(Arrays.asList(obs1, obs2)).subscribe(
System.out::println
, e -> System.out.println(e.getMessage())
, () -> System.out.println("onCompleted")
);