RxJava相关问题
RxJava
什么是 RxJava?
一个在 JVM 上使用可观测的序列来组成异步的、基于事件的程序的库。
举个例子说明:下载一张照片,用户点击下载,弹出正在下载提示框,下载结束显示图片,关闭提示框。
用 RxJava 实现简洁。
和观察者模式对比:观察者设计模式,起点是被观察者,终点是观察者,一条流水线的思维,响应式编程.
RxJava 原理?
- 构建链的阶段
- subscribe 阶段
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
System.out.println("subscribe > " + Thread.currentThread().getName());
emitter.onNext("test");
emitter.onComplete();
}
}).flatMap(new Function<String, ObservableSource<String>>() {
public ObservableSource<String> apply(@NonNull String s) throws Exception {
return Observable.just(s);
}
}).map(new Function<String, Integer>() {
@Override
public Integer apply(@NonNull String s) throws Exception {
return 0;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("onSubscribe > " + Thread.currentThread().getName());
}
@Override
public void onNext(@NonNull Integer integer) {
System.out.println("onNext > " + Thread.currentThread().getName());
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("onError > " + Thread.currentThread().getName());
}
@Override
public void onComplete() {
System.out.println("onComplete > " + Thread.currentThread().getName());
}
});
- Observable 的创建,没有每个操作符 xxx 都对应一个 ObservableXxx(比如 create,对应 ObservableCreate),每次链式调用一个操作符,都是将前面的 Observable 作为 source 保存在当前 ObservableXxx 中
ObservableCreate 的 source 是 ObservableOnSubscribe,ObservableFlatMap 的 source 是 ObservableCreate,ObservableMap 的 source 是 ObservableFlatMap
- subscribe,调用的就是 Observable 的 subscribeActual,最后的操作符,优先 subscribe
ObservableMap.subscribeActual: ObservableFlatMap.subscribe(MapObserver(自定义的 Observer)) ObservableFlatMap.subscribeActual: ObservableCreate.subscribe(MergeObserver(MapObserver)) ObservableCreate.subscribeActual: ObservableOnSubscribe.subscribe(CreateEmitter(MergeObserver)) ObservableOnSubscribe.subscribe(): 调用 Emitter.onNext() 和 onComplete() Emitter.onNext : 调用的就是 MergerObserver 的 onNext
操作符的链式调用,其实就是前面的 ObservableXxx.subscribe 后面的 XXXObserver,每个 XxxObserver 又持有后续的 xxxObserver,最顶层的 CreateEmitter,调用的 Observer.onNext,然后就一层层的按照操作链式的顺序调用各个操作符对应的 Observer
RxJava 怎么切换线程的?
- 由于 Observable 的创建,是一层包裹一层的
- subscribeOn 对应 ObservableSubscribeOn,在其 subscribeActual 里,通过 SubscribeTask,它是一个 Runnable,在其 run 方法里,后续订阅都在在该 Scheduler 里,导致调用 onNext 时也是在该 Scheduler 里
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void subscribeActual(final Observer<? super T> observer) {
scheduler.scheduleDirect(new SubscribeTask(parent))
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
- observeOn 对应 ObservableObserveOn,在其 ObserveOnObserver,调用他的 onNext 时,是被 schedule 到指定的 Scheduler 中去的,而 observeOn 一般是放在 subscribe() 前一个,所以就会让你自己写的 Observer 的 onNext 就在指定的 Scheduler 运行,影响的是 observeOn 后续的代码线程运行