其他操作符
timeout
如果原始 Observable 过了指定的一段时长没有发射任何数据,Timeout 操作符会以一个 onError 通知终止这个 Observable,或者继续一个备用的 Observable。
RxJava 中的实现的 Timeout 操作符有好几个变体:
1
2
3
4
5
6
7
8
9
10
11
1. timeout(long,TimeUnit): 第一个变体接受一个时长参数,每当原始Observable发射了一项数据,timeout就启动一个计时器,如果计时器超过了指定指定的时长而原始Observable没有发射另一项数据,timeout就抛出TimeoutException,以一个错误通知终止Observable。 这个timeout默认在computation调度器上执行,你可以通过参数指定其它的调度器。
2. timeout(long,TimeUnit,Observable): 这个版本的timeout在超时时会切换到使用一个你指定的备用的Observable,而不是发错误通知。它也默认在computation调度器上执行。
3. timeout(Function):这个版本的timeout使用一个函数针对原始Observable的每一项返回一个Observable,如果当这个Observable终止时原始Observable还没有发射另一项数据,就会认为是超时了,timeout就抛出TimeoutException,以一个错误通知终止Observable。
4. timeout(Function,Observable): 这个版本的timeout同时指定超时时长和备用的Observable。它默认在immediate调度器上执行
5. timeout(Func0,Function):这个版本的time除了给每一项设置超时,还可以单独给第一项设置一个超时。它默认在immediate调度器上执行。
6. timeout(Func0,Function,Observable): 同上,但是同时可以指定一个备用的Observable。它默认在immediate调度器上执行。
案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Override
public void button1Click() {
_getObservableTask_2sToComplete()
.timeout(3, TimeUnit.SECONDS)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
LogUtil.logi(TAG, "onNext", s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
LogUtil.logw(TAG, "onError", throwable.getMessage());
}
}, new Action() {
@Override
public void run() throws Exception {
LogUtil.logi(TAG, "onComplete", "onComplete");
}
});
}
private Observable<String> _getObservableTask_2sToComplete() {
return Observable.create(
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> subscriber) throws Exception {
LogUtil.logw(TAG, "sleep", String.format("Starting a 2s task"));
subscriber.onNext("1");
SystemClock.sleep(1000);
subscriber.onNext("2");
SystemClock.sleep(3000);
subscriber.onComplete();
}
});
}
结果:
1
2
3
4
W/hacket.rxjava2: 【sleep】Starting a 2s task,线程:RxComputationThreadPool-2,日期:2018-11-09 18:33:05
I/hacket.rxjava2: 【onNext】1,线程:main,日期:2018-11-09 18:33:05
I/hacket.rxjava2: 【onNext】2,线程:main,日期:2018-11-09 18:33:06
W/hacket.rxjava2: 【onError】null,线程:main,日期:2018-11-09 18:33:09
delay
让原始 Observable 在发射每项数据之前都暂停一段指定的时间段,结果是 Observable 发射的数据项在时间上整体延后一段时间
注意:delay 不会平移 onError 通知,它会立即将这个通知传递给订阅者,同时丢弃任何待发射的 onNext 通知。但是它会平移一个 onCompleted 通知。
delaySubscription
和 delay 不同的是,delaySubscription
是延迟订阅原始 Observable,这样也能达到数据延迟发射的效果
就是原始 Observable 该怎么发射消息还是怎么发射,因为只有订阅之后才会开始发射消息,所以延迟 2s。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 第二个订阅者延迟2s订阅,这将导致丢失前面2s内发射的数据
connectableObservable.delaySubscription(2, TimeUnit.SECONDS)
.as(bindLifecycle())
.subscribe(new Observer<Long>() {
@Override
public void onError(Throwable e) {
LogUtil.d(TAG, "【Observer3】onError");
}
@Override
public void onComplete() {
LogUtil.d(TAG, "【Observer3】onCompleted");
}
@Override
public void onSubscribe(Disposable d) {
LogUtil.d(TAG, "【Observer3】onSubscribe");
}
@Override
public void onNext(Long value) {
LogUtil.d(TAG, "【Observer3】onNext value :" + value);
}
});
using 操作符
Using 操作符指示 Observable 创建一个只在它的生命周期内存在的资源,当 Observable 终止时这个资源会被自动释放。
using 操作符接受三个参数:
- 一个用于 创建一次性资源的工厂函数
- 一个用于创建 Observable 的工厂函数
- 一个用于释放资源的函数
当一个观察者订阅 using 返回的 Observable 时,using 将会使用 Observable 工厂函数创建观察者要观察的 Observable,同时使用资源工厂函数创建一个你想要创建的资源。当观察者取消订阅这个 Observable 时,或者当观察者终止时(无论是正常终止还是因错误而终止),using 使用第三个函数释放它创建的资源。
典型的应用:DB 连接,socket
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private fun executeUsingOperation() {
val resourceSupplier = Callable<Realm> { Realm() }
val sourceSupplier = Function<Realm, Publisher<Int>> { realm ->
Flowable.just(true)
.map {
realm.doSomething()
// i would use the copyFromRealm and change it to a POJO
Random().nextInt(50)
}
}
val disposer = Consumer<Realm> { realm ->
realm.clear()
}
Flowable.using(resourceSupplier, sourceSupplier, disposer)
.subscribe {
LogUtil.logi(TAG, "subscribe", "got a value $it - (look at the logs)")
}
}
inner class Realm {
init {
LogUtil.logd(TAG, "Realm init", "initializing Realm instance")
}
fun doSomething() {
LogUtil.logd(TAG, "Realm doSomething", "do something with Realm instance")
}
fun clear() {
// notice how this is called even before you manually "dispose"
LogUtil.logw(TAG, "Realm clear", "cleaning up the resources (happens before a manual 'dispose'")
}
}
结果:
1
2
3
4
D/hacket.rxjava2: 【Realm init】initializing Realm instance,线程:main,日期:2018-11-12 19:46:08
D/hacket.rxjava2: 【Realm doSomething】do something with Realm instance,线程:main,日期:2018-11-12 19:46:08
W/hacket.rxjava2: 【subscribe】got a value 19 - (look at the logs),线程:main,日期:2018-11-12 19:46:08
D/hacket.rxjava2: 【Realm clear】cleaning up the resources (happens before a manual 'dispose',线程:main,日期:2018-11-12 19:46:08
repeatXXX
操作符 repeat
创建一个发射特定数据重复多次的 Observable。
repat 不是创建一个 Observable,而是重复发射原始 Observable 的数据序列,这个序列或者是无限的,或者是通过 repeat(n) 指定的重复次数。
1
2
3
4
5
6
7
8
9
10
Observable.just("hello rxjava2 repat")
.repeat(3)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
LogUtil.i(TAG, "repat:" + s);
sb.append("repeat accept:" + s + "\n");
}
});
操作符 repeatUntil
repeatUntil 表示直到某个条件就不再重复发射数据。当 BooleanSupplier 的 getAsBoolean 返回 false 时,表示重复发射上游的 Observable;当返回 true 时,表示终止重复发射上游的 Observable。
操作符 repeatWhen
repeatWhen
不是缓存和重放原始 Observable 的数据序列,而是有条件地重新订阅和发射原来的 Observable。
repeatWhen 可以实现重订阅功能,而触发重订阅两个关键因素:
- Obervable 完成一次订阅,就是 Observable 调用 onComplete
- 当 Observable 调用 onComplete 就会进入到 repeatWhen 方法里面,是否要触发重订阅,就需要通过 repeatWhen 的 Function 方法所返回的 ObservableSource 确定,如果返回的是 onNext 则触发重订阅,而返回的是 onComplete/onError 则不会触发重订阅
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
private void repeatWhenTest() {
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("one");
emitter.onNext("two");
emitter.onNext("three");
// String i = null;
// i.toUpperCase();
emitter.onComplete();
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "doOnComplete 触发重订阅");
}
})
.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
private int n = 0;
@Override
public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Object o) throws Exception {
Log.v(TAG, "repeatWhen:" + o);
if (n < 3) {
n++;
return Observable.timer(3, TimeUnit.SECONDS);
} else {
return Observable.empty();
}
}
});
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
D: onNext: one
D: onNext: two
D: onNext: three
D: doOnComplete 触发重订阅
V: repeatWhen:0
D: onNext: one
D: onNext: two
D: onNext: three
D: doOnComplete 触发重订阅
V: repeatWhen:0
D: onNext: one
D: onNext: two
D: onNext: three
D: doOnComplete 触发重订阅
V: repeatWhen:0
D: onNext: one
D: onNext: two
D: onNext: three
D: doOnComplete 触发重订阅
V: repeatWhen:0
需要注意的是 repeatWhen 的 objectObservable 必须处理,这里使用 flathMap 进行处理,让它延时发出 onNext,这里 onNext 发出什么数据都不重要,它只是仅仅用来处理重订阅的通知,如果发出的是 onComplete/onError,则不会触发重订阅:
- doOnComplete 当 Observable 调用 onComplete 就会被触发
- Observable.time 延迟一段时间后,发送 onNext,数据为 0
- Observable.empty 发送 onComplete,Observer 不会回调 onComplete
- Observable.error 发送 onError,Observer 的 onError 会被回调,并接收错误信息