文章

RxJava相关问题

RxJava相关问题

RxJava

什么是 RxJava?

一个在 JVM 上使用可观测的序列来组成异步的、基于事件的程序的库。
举个例子说明:下载一张照片,用户点击下载,弹出正在下载提示框,下载结束显示图片,关闭提示框。
用 RxJava 实现简洁。
和观察者模式对比:观察者设计模式,起点是被观察者,终点是观察者,一条流水线的思维,响应式编程.

RxJava 原理?

  1. 构建链的阶段
  2. subscribe 阶段
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
        System.out.println("subscribe > " + Thread.currentThread().getName());
        emitter.onNext("test");
        emitter.onComplete();
    }
}).flatMap(new Function<String, ObservableSource<String>>() {
    public ObservableSource<String> apply(@NonNull String s) throws Exception {
        return Observable.just(s);
    }
}).map(new Function<String, Integer>() {
    @Override
    public Integer apply(@NonNull String s) throws Exception {
        return 0;
    }
}).subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        System.out.println("onSubscribe > " + Thread.currentThread().getName());
    }
    
    @Override
    public void onNext(@NonNull Integer integer) {
        System.out.println("onNext > " + Thread.currentThread().getName());
    }
    @Override
    public void onError(@NonNull Throwable e) {
        System.out.println("onError > " + Thread.currentThread().getName());
    }
    @Override
    public void onComplete() {
        System.out.println("onComplete > " + Thread.currentThread().getName());
    }
    });
  1. Observable 的创建,没有每个操作符 xxx 都对应一个 ObservableXxx(比如 create,对应 ObservableCreate),每次链式调用一个操作符,都是将前面的 Observable 作为 source 保存在当前 ObservableXxx 中

ObservableCreate 的 source 是 ObservableOnSubscribe,ObservableFlatMap 的 source 是 ObservableCreate,ObservableMap 的 source 是 ObservableFlatMap

  1. subscribe,调用的就是 Observable 的 subscribeActual,最后的操作符,优先 subscribe

ObservableMap.subscribeActual: ObservableFlatMap.subscribe(MapObserver(自定义的 Observer)) ObservableFlatMap.subscribeActual: ObservableCreate.subscribe(MergeObserver(MapObserver)) ObservableCreate.subscribeActual: ObservableOnSubscribe.subscribe(CreateEmitter(MergeObserver)) ObservableOnSubscribe.subscribe(): 调用 Emitter.onNext() 和 onComplete() Emitter.onNext : 调用的就是 MergerObserver 的 onNext

操作符的链式调用,其实就是前面的 ObservableXxx.subscribe 后面的 XXXObserver,每个 XxxObserver 又持有后续的 xxxObserver,最顶层的 CreateEmitter,调用的 Observer.onNext,然后就一层层的按照操作链式的顺序调用各个操作符对应的 Observer
image.png

RxJava 怎么切换线程的?

  1. 由于 Observable 的创建,是一层包裹一层的
  2. subscribeOn 对应 ObservableSubscribeOn,在其 subscribeActual 里,通过 SubscribeTask,它是一个 Runnable,在其 run 方法里,后续订阅都在在该 Scheduler 里,导致调用 onNext 时也是在该 Scheduler 里
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void subscribeActual(final Observer<? super T> observer) {
    scheduler.scheduleDirect(new SubscribeTask(parent))
}
final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        source.subscribe(parent);
    }
}
  1. observeOn 对应 ObservableObserveOn,在其 ObserveOnObserver,调用他的 onNext 时,是被 schedule 到指定的 Scheduler 中去的,而 observeOn 一般是放在 subscribe() 前一个,所以就会让你自己写的 Observer 的 onNext 就在指定的 Scheduler 运行,影响的是 observeOn 后续的代码线程运行

RxJava 中 flatmap 原理

本文由作者按照 CC BY 4.0 进行授权