文章

RxJava原理

RxJava原理

create 分析

create 无其他操作符,无切换线程原理分析

看看 create 的使用案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("hacket");
            emitter.onComplete();
        }
    }).subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
            System.out.println("onSubscribe");
        }
        @Override
        public void onNext(@NonNull String s) {
            System.out.println("onNext:" + s);
        }
        @Override
        public void onError(@NonNull Throwable e) {
            e.printStackTrace();
            System.out.println("onError:" + e.getMessage());
        }
        @Override
        public void onComplete() {
            System.out.println("onComplete");
        }
    });
}

输出:

1
2
3
onSubscribe
onNext:hacket
onComplete

要分析原理,我们分 2 部分来看看:Observable 的创建和 subscribe

首先看 Observable 的创建部分,Observable.create:

1
2
3
4
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null"); // 判断source不为null
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); 
}
  1. 判断 source 不为 null
  2. RxJavaPlugins.onAssembly,RxJava 提供的全局 hook 操作符的入口,如果设置了保存在 onObservableAssembly
  3. new 了一个 ObservableCreate,并把 source 传递进去

source 是什么呢?

1
2
3
public interface ObservableOnSubscribe<T> {
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}

ObservableOnSubscribe 是一个拥有 subscribe 的接口,接收一个 ObservableEmitter 实例

下面看看 ObservableCreate:

1
2
3
4
5
6
7
public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    // ...
}

这里只是将 source 作为一个成员变量保存在 ObservableCreate 类中

到这里可以看到,create 操作就是创建一个 ObservableCreate 对象。

下面看 subscribe 订阅,从上面可以知道,返回的 Observable 就是一个 ObservableCreate 对象,那么 subscribe 也是调用的 ObservableCreate 的 subscribe() 方法,而 ObservableCreate 又是继承 Observable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Observable
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        // 全局hook onSubscribe
        observer = RxJavaPlugins.onSubscribe(this, observer)
        // 最后调用subscribeActual,这个方式是个抽象的
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

subscribe 做了如下操作:

  1. 回调全局的 onObservableSubscribe 并判断返回的 Observer 部位 null
  2. 调用抽象方法 subscribeActual,这个方法由具体子类实现,这里是 ObservableCreate

最终调用的是 ObservableCreate 的 subscribeActual:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // CreateEmitter
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        // 调用Observer的onSubscribe,传递parent
        observer.onSubscribe(parent);
        try {
            source.subscribe(parent); // 就是我们代码写的匿名内部类的subscribe方法
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
        final Observer<? super T> observer;
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
        @Override
        public void onNext(T t) {
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
        // ...
    }
}
  1. 创建了一个 CreateEmitter
  2. 回调 Observer 的 onSubscribe(Disposable) 方法,代表订阅成功了
  3. 调用 create 代码中写的匿名内部类的 subscribe 方法

在 ObservableOnSubscribe 的 subscribe 中我们调用了:

1
2
emitter.onNext("hacket");
emitter.onComplete();

其实就是调用了上面 CreateEmitter 的 onNext 和 onComplete 方法,最后调用的是 Observer 的 onNext 和 onComplete 方法。

RxJava 原理分析

RxJava 原理 - 简单的链式调用(无线程切换)

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
            System.out.println("subscribe > " + Thread.currentThread().getName());
            emitter.onNext("test");
            emitter.onComplete();
        }
    }).flatMap(new Function<String, ObservableSource<String>>() {
        public ObservableSource<String> apply(@NonNull String s) throws Exception {
            return Observable.just(s);
        }
    }).map(new Function<String, Integer>() {
        @Override
        public Integer apply(@NonNull String s) throws Exception {
            return 0;
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
            System.out.println("onSubscribe > " + Thread.currentThread().getName());
        }

        @Override
        public void onNext(@NonNull Integer integer) {
            System.out.println("onNext > " + Thread.currentThread().getName());
        }
        @Override
        public void onError(@NonNull Throwable e) {
            System.out.println("onError > " + Thread.currentThread().getName());
        }
        @Override
        public void onComplete() {
            System.out.println("onComplete > " + Thread.currentThread().getName());
        }
    });

输出:

1
2
3
4
onSubscribe > main
subscribe >  main
onNext >  main
onComplete >  main

整体代码执行流程:
image.png

数据源的包裹封装(正向)

正向数据源包裹封装:

1
ObservableOnSubscribe → ObservableCreate → ObservableFlatMap → ObservableMap

create(ObservableCreate)

先看 Observable.create,new 一个 ObservableCreate,保存 source(类型 ObservableOnSubscribe)为成员变量

1
2
3
4
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
   ObjectHelper.requireNonNull(source, "source is null");
   return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

ObservableCreate 内部包含一个类型为 ObservableOnSubscribe<T> 的 source 变量,source 就是上面 create 操作符 new 出来的匿名内部类。现在看看 ObservableCreate:

1
2
3
4
5
6
7
8
9
10
11
public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // ...
    }
    // ...
}

flatMap(ObservableFlatMap)

接下来看 flatMap,new 一个 ObservableFlatMap,

1
2
3
4
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) {
    // ...
    return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}

调用 flatMap 的为前面 create 创建出来的 ObservableCreate。现在看看 ObservableFlatMap:

1
2
3
4
5
6
7
8
9
public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
    // ...
    public ObservableFlatMap(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends U>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) {
        super(source);
        this.mapper = mapper;   
        // ...
    }
}

ObservableFlatMap 内部持有一个类型为 ObservableSource<T> 的 source 变量,而该 source 则是上一步中的 ObservableCreate 实例。ObservableFlatMap 内部还持有一个类型为 Function 的 mapper,这个就是上面代码 flatMap 的匿名内部类 Function<String,ObservableSource<String>>

map(ObservableMap)

最后是 map 操作符,new 了一个 ObservableMap

1
2
3
4
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

看看 ObservableMap:

1
2
3
4
5
6
7
8
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;
    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }
    // ...
}

看看 ObservableMap 内部持有一个类型为 ObservableSource<T> 的 source 变量,而该 source 则是上一步中的 ObservableFlatMap 实例。ObservableMap 内部还持有了一个类型为 Function 的 function,这个就是上面代码 map 的匿名内部类 Function<String,Integer>

到此,数据源的包裹封包完毕,下面是 subscribe 订阅部分。

小结

  1. 操作符 xxx,都存在着对应的 ObservableXxx 类,如 create 对应 ObservableCreate,map 对应 ObservableMap,flapMap 对应 ObservableFlatMap
  2. create→flatMap→map 层层封装包裹;后面的操作符,持有前面操作符对应的实例;如 ObservableMap 持有的 source 为 ObservableCreate,ObservableMap 持有的 source 为 ObservableFlatMap
  3. 每个操作符对应的类都持有了各自的功能,如 create 是 ObservableOnSubscribe<T>,被 Observer 订阅后,通过 ObservableEmitter 发送数据;flatMap 是 Function<? super T, ? extends ObservableSource<? extends U>>,输入为 T,输出为 ObservableSource<U> 的 Function;map 是 Function<? super T, ? extends U>,输入为 T,输出为 U 的 Function

订阅数据源(逆向)subscribe

逆向订阅流程:

1
2
3
4
ObservableMap.subscribe(Observer)   // .subscribe(Observer)
ObservableFlatMap.subscribe(MapObserver)   // .map()
ObservableCreate.subscribe(MergerObserver)  // .flatMap
ObservableOnSubscribe.subscribe(CreateEmitter(MergerObserver)) // .create

以上的代码调用并没有出发数据的流转,只有当我们调用 subscribe 时才真正触发了 RxJava 的数据流。而调用 subscribe 的 Observable 是最后的 map 操作符对应的 ObservableMap。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);
        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
        // 发生订阅的核心方法,这是一个抽象方法,由具体操作符对应的ObservableXxx实现
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        // ...
        throw npe;
    }
}

订阅调用的是 subscribeActual,执行 subscribeActual 的对象其实是 ObservableMap。

ObservableMap#subscribeActual

上面示例中调用 subscribe 后,调用的就是 ObservableMap#subscribeActual(),现在看看 ObservableMap 的 subscribeActual:

1
2
3
4
5
6
7
8
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    // ...
    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
    // ...
}
  1. source 为前面分析的 ObservableFlatMap
  2. new 了一个 MapObserver,t 为上面示例代码中一个 Observer 匿名内部类,真正的 Observer;function 前面已经分析
  3. 调用了 subscribe,Observer 为 MapObserver,最终又调用到了 FlatMapObservable 的 subscribeActual

其实相当于 ObservableFlatMap.subscribe(MapObserver)

ObservableFlatMap#subscribeActual

现在看看 ObservableFlatMap#subscribeActual

1
2
3
4
5
6
7
8
public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    // ...
    public void subscribeActual(Observer<? super U> t) {
        // ...
        source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }
    // ...
}
  1. source 为 ObservableCreate
  2. t 为 MapObserver
  3. 创建了一个 MergeObserver

其实相当于 ObservableCreate.subscribe(MergeObserver)

ObservableCreate#subscribeActual

现在看 ObservableCreate#subscribeActual

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    protected void subscribeActual(Observer<? super T> observer) {  
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    } 
}
  1. source 为 ObservableOnSubscribe,即传递给 create 的匿名内部类
  2. observer 为 MergeObserver

相当于 ObservableOnSubscribe.subscribe(MergeObserver)

触发数据源产生原始数据,数据流转 onNext/onComplete

当订阅发生在最顶层时,也就是 ObservableCreate 中的 subscribeActual ,此时触发了数据源的产生,通过 emitter 发射数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public final class ObservableCreate<T> extends Observable<T> {
    // ...
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent); // 此时触发了onSubscribe回调
        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    // ...
}

调用 source.subscribe(parent),其实就是调用上面示例中的匿名内部类 ObservableOnSubscribe

1
2
3
4
5
6
7
8
new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
        System.out.println("subscribe > " + Thread.currentThread().getName());
        emitter.onNext("test");
        emitter.onComplete();
    }
}

再来看 CreateEmitter 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
    final Observer<? super T> observer;
    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }
    @Override
    public void onNext(T t) {
        // ...
        if (!isDisposed()) {
            observer.onNext(t);  // 向下层分发数据
        }
    }
    // ...
  }

根据我们上面的分析 CreateEmitter 中持有的 observer 是 FlatMapObserver 的实例,而 FlatMapObserver 调用 onNext 时,又会调用 MapObserver 的 onNext ,依次调用至我们自己实现的观察者的 onNext 处理数据,此时数据流转完毕。

小结

  1. 操作符 对应产生的被观察者和观察者命名规则很有规律,比如说被观察者的命名 Observable + 操作符 ,例如 ObservableMap = Observable + map;观察者命名大多遵循 操作符 + Observer ,例如 FlatMapObserver = flatMap + Observer。
  2. 一个是按照代码顺序的操作符产生了一个一层层的数据源包裹(蓝色虚线的流程部分)
  3. 另外一个是在逆向订阅时,将观察者按照订阅顺序打包成一个一层层的观察者包裹(上部分的红色流程部分)

线程切换,subscribeOn,无 observeOn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
new Thread() {
    @Override
    public void run() {
        Observable
                .create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
                        System.out.println("subscribe >  " + Thread.currentThread().getName());
                        emitter.onNext("test");
                        emitter.onComplete();
                    }
                })
                .subscribeOn(Schedulers.io())
                .map(new Function<String, Integer>() {
                    @Override
                    public Integer apply(@NonNull String s) throws Exception {
                        return 1;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
                        System.out.println("onSubscribe > " + Thread.currentThread().getName());
                    }

                    @Override
                    public void onNext(@NonNull Integer integer) {
                        System.out.println("onNext >  " + integer + " " + Thread.currentThread().getName());
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        e.printStackTrace();
                        System.out.println("onError >  " + Thread.currentThread().getName());
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete >  " + Thread.currentThread().getName());
                    }
                });
    }
}.start();

输出:

1
2
3
4
onSubscribe > Thread-0
subscribe >  RxCachedThreadScheduler-1
onNext >  1 main
onComplete >  main

数据源的包裹封装(正向)

1
ObservableOnSubscribe  ObservableCreate  ObservableSubscribeOn  ObservableMap  ObservableObserveOn

订阅数据源(逆向)subscribe

单个 subscribeOn 示例 1,subscribeOn 在最后

1
2
3
4
Observable.create()
    .map()
    .subscribeOn(Schedulers.newThread())
     .subscribe()

订阅流程:

1
2
3
4
ObservableSubscribeOn.subscribe(Observer)  // .subscribe() 在当前线程
ObservableMap.subscribe(SubscribeOnObserver)  // subscribeOn 这里发生了线程切换,上游的订阅全部发生在该线程
ObservableCreate.subscribe(MapObserver)  // .map()
ObservableOnSubscribe.subscribe(CreateEmitter(MapObserver)) // .create()

可以看到,在 subscribeOn 调用后,上游的 map 和 create 都是订阅在 newThread 线程。后续的事件发送都会在该线程

单个 subscribeOn 示例 2,subscribeOn 在中间

1
2
3
4
5
6
7
thread {
    Observable.create()
        .map() // 1
        .subscribeOn(Schedulers.newThread())
        .map() // 2
        .subscribe()
}

输出:

1
2
3
4
5
6
onSubscribe > Thread-0
subscribe >  RxNewThreadScheduler-1
map1 > RxNewThreadScheduler-1
map2 > RxNewThreadScheduler-1
onNext >  2 RxNewThreadScheduler-1
onComplete >  RxNewThreadScheduler-1
  1. 数据源包裹封装
1
ObservableOnSubscribe  ObservableCreate  ObservableMap  ObservableSubscribeOn  ObservableMap
  1. 定义数据源
1
2
3
4
5
ObservableMap.subscribe(Observer)  // .subscribe()
ObservableSubscribeOn.subscribe(MapObserver)  // .map() 
ObservableMap.subscribe(SubscribeOnObserver)   // subscribeOn() 这里发生了线程切换
ObservableCreate.subscribe(MapObserver)  // .map()
ObservableOnSubscribe.subscribe(CreateEmitter(MapObserver)) // .create()

可以看到,在 subscribeOn 调用后,上游的 map 和 create 都是订阅在 newThread 线程。后续的事件发送都会在该线程。可以看到 subscribeOn 写在哪里无所谓的。

数据流转 onNext/onComplete

create 数据源在什么线程,后面调用的 onNext 都是在这个线程中。

subscribeOn 总结

subscribeOn 只生效一次?为什么 subscribeOn 只有第一次有效?

subscribeOn 通过切换订阅线程,改变 Observable.create 所在线程,从而影响数据的发射线程。由于订阅过程自下而上,所以 Observable.create 只受最近一次 subscribeOn 影响,当调用链中有多个 subscribeOn 时只有第一个有效。其他 subscibeOn 仍然可以影响其上游的 doOnSubscribe 的执行线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
fun test() {
    Observable.create<Unit> { emitter ->
        log("onSubscribe")
        emitter.onNext(Unit)
        emitter.onComplete()
    }.subscribeOn(namedScheduler("1 - subscribeOn"))
        .doOnSubscribe { log("1 - doOnSubscribe") }
        .subscribeOn(namedScheduler("2 - subscribeOn"))
        .doOnSubscribe { log("2 - doOnSubscribe") }
        .doOnNext { log("onNext") }
        .test().awaitTerminalEvent() // Wait until observable completes
}

image.png

类似的问题:

  1. subscribeOn 是离数据源近的有效还是远的有效?

订阅是反向订阅,离数据源最近的有效,最近的会覆盖掉后面的 subscribeOn

  1. 为什么 subscribeOn 写在 map 前面和后面都是一样?

subscribeOn 只有一次有效,写哪里都一样

  1. 为什么 subscribeOn 写在 observeOn 也可以?

subscribeOn 用来决定订阅线程,但这并不意味着上游数据一定来自此线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Test
fun test() {
    val observable = Observable.create<Int> { emitter ->
        log("onSubscribe")
        thread(name = "Main thread", isDaemon = false) {
            log("1 - emitting"); emitter.onNext(1)
            log("2 - emitting"); emitter.onNext(2)
            log("3 - emitting"); emitter.onNext(3)
            emitter.onComplete()
        }
    }
    observable
        .subscribeOn(Schedulers.computation())
        .doOnNext { log("$it - after subscribeOn") }
        .test().awaitTerminalEvent() // Wait until observable completes
}

image.png

subscribeOn 支持决定订阅线程,但这并不意味着上游数据一定来自此线程,这是因为发送数据 onNext/onComplete/onError 也在其他线程中运行。

对于 PublishSubject 无效

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Test
fun test() {
    val subject = PublishSubject.create<Int>()
    val observer1 = subject
        .subscribeOn(Schedulers.io())
        .doOnNext { log("$it - I want this happen on an IO thread") }
        .test()
    val observer2 = subject
        .subscribeOn(Schedulers.newThread())
        .doOnNext { log("$it - I want this happen on a new thread") }
        .test()

    sleep(10); 
    subject.onNext(1)
    subject.onNext(2)
    subject.onNext(3)
    subject.onComplete()

    observer1.awaitTerminalEvent()
    observer2.awaitTerminalEvent()
}

image.png

对于 PublishSubject 来说,上游数据来自哪个线程是在 onNext 时决定的,所以对一个 PublishSubject 使用使用 subscribeOn 没有意义。

对于 Observable.just() 无效

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static void testJust() {
    Observable.just(getJustData())
        .subscribeOn(Schedulers.io())
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println("accept :" + s + " " + Thread.currentThread().getName());
            }
        });
}
private static String getJustData() {
    System.out.println("getJustData :" + Thread.currentThread().getName());
    return "just data";
}

输出:

1
2
getJustData :main
accept :just data RxCachedThreadScheduler-1

如上,getJustData() 放在 just 中显然是不合适的。just() 在当前线程立即执行,因此不受 subscribeOn 影响,应该修改如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private static void testJust2() {
    Observable
            .defer(new Callable<ObservableSource<String>>() {
                @Override
                public ObservableSource<String> call() throws Exception {
                    return Observable.just(getJustData());
                }
            })
            .subscribeOn(Schedulers.io())
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    System.out.println("accept :" + s + " " + Thread.currentThread().getName());
                }
            });
    // 或者
    Observable.fromCallable(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return getJustData();
            }
        })
        .subscribeOn(Schedulers.io())
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println("accept :" + s + " " + Thread.currentThread().getName());
            }
        });
}

输出:

1
2
getJustData :RxCachedThreadScheduler-1
accept :just data RxCachedThreadScheduler-1

使用 flatMap 处理并发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private static void testFlatMap() {
    List<String> list = new ArrayList<>();
    list.add("id1");
    list.add("id2");
    list.add("id3");
    Observable.fromIterable(list)
            .flatMap(new Function<String, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(@NonNull String s) throws Exception {
                    System.out.println("flatMap apply " + s + "," + Thread.currentThread().getName());
                    return toUpperCase(s);
                }
            })
            .subscribeOn(Schedulers.io())
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    System.out.println("subscribe accept " + s + ", " + Thread.currentThread().getName());
                }
            });
}
private static Observable<String> toUpperCase(String data) {
    return Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
            System.out.println("toUpperCase create " + data + "," + Thread.currentThread().getName());
            emitter.onNext(data.toUpperCase());
        }
    });
}

输出:

1
2
3
4
5
6
7
8
9
flatMap apply id1,RxCachedThreadScheduler-1
toUpperCase create id1,RxCachedThreadScheduler-1
subscribe accept ID1, RxCachedThreadScheduler-1
flatMap apply id2,RxCachedThreadScheduler-1
toUpperCase create id2,RxCachedThreadScheduler-1
subscribe accept ID2, RxCachedThreadScheduler-1
flatMap apply id3,RxCachedThreadScheduler-1
toUpperCase create id3,RxCachedThreadScheduler-1
subscribe accept ID3, RxCachedThreadScheduler-1

如果我们希望多个 toUpperCase(data) 并发执行,上述写法是错误的。

subscribeOn 决定了 flatMap 上游线程,flatMap 返回多个 Observable 的订阅都是发生在此线程,多个 toUpperCase 只能运行在单一线程,无法实现并行。

想要达到并行执行效果,需要修改如下:

1
2
3
4
5
6
7
8
9
private static Observable<String> toUpperCase(String data) {
    return Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
            System.out.println("toUpperCase create " + data + "," + Thread.currentThread().getName());
            emitter.onNext(data.toUpperCase());
        }
    }).subscribeOn(Schedulers.newThread());
}

线程切换 observeOn

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
private static void testObserveOn() {
    new Thread() {
        @Override
        public void run() {
            Observable
                    .create(new ObservableOnSubscribe<String>() {
                        @Override
                        public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
                            System.out.println("create subscribe >  " + Thread.currentThread().getName());
                            new Thread() {
                                @Override
                                public void run() {
                                    super.run();
                                    emitter.onNext("create test");
                                    emitter.onComplete();
                                }
                            }.start();
                        }
                    })
                    .subscribeOn(Schedulers.io())
                    .map(s -> {
                        System.out.println("map1 > " + Thread.currentThread().getName());
                        return 1;
                    })
                    .subscribeOn(Schedulers.newThread())
                    .observeOn(Schedulers.computation())
                    .map(s -> {
                        System.out.println("map2 > " + Thread.currentThread().getName());
                        return 2;
                    })
                    .observeOn(Schedulers.single())
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(@NonNull Disposable d) {
                            System.out.println("onSubscribe > " + Thread.currentThread().getName());
                        }

                        @Override
                        public void onNext(@NonNull Integer integer) {
                            System.out.println("onNext >  " + integer + " " + Thread.currentThread().getName());
                        }

                        @Override
                        public void onError(@NonNull Throwable e) {
                            System.out.println("onError >  " + Thread.currentThread().getName());
                        }

                        @Override
                        public void onComplete() {
                            System.out.println("onComplete >  " + Thread.currentThread().getName());
                        }
                    });
        }
    }.start();
}

输出:

1
2
3
4
5
6
onSubscribe > Thread-0
create subscribe >  RxCachedThreadScheduler-1  // .subscribeOn(IO)
map1 > Thread-1 // new Thread
map2 > RxComputationThreadPool-1 // .observeOn(Schedulers.computation())
onNext >  2 RxSingleScheduler-1 // .observeOn(Schedulers.single())
onComplete >  RxSingleScheduler-1 // .observeOn(Schedulers.single())

代码简写:

1
2
3
4
5
6
7
8
Observable.create()
    .subscribeOn(Schedulers.io())
    .map()
    .subscribeOn(Schedulers.newThread())
    .observeOn(Schedulers.computation())
    .map()
    .observeOn(Schedulers.single())
    .subscribe()

订阅流程:

1
2
3
4
5
6
7
8
ObservableObserveOn.subscribe(Observer) ↓  // .subscribe()
ObservableMap.subscribe(ObserveOnObserver) ↓ // .observeOn(Schedulers.single()),Observer中onNext发送数据时实现了线程切换
ObservableObserveOn.subscribe(MapObserver) ↓ // .map()
ObservableSubscribeOn.subscribe(ObserveOnObserver) ↓ // .observeOn(Schedulers.computation()),MapObserver中onNext发送数据时实现了线程切换
ObservableMap.subscribe(SubscribeOnObserver) ↓ // .subscribeOn(Schedulers.newThread())
ObservableSubscribeOn.subscribe(MapObserver) ↓ // .map()
ObservableCreate.subscribe(SubscribeOnObserver) ↓ // .subscribeOn(Schedulers.io())
ObservableOnSubscribe.subscribe(CreateEmitter(SubscribeOnObserver)) // .create()

原理分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//ObservableObserveOn.java
final class ObservableObserveOn extends Observable<T> {
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else { 
            Scheduler.Worker w = scheduler.createWorker();
            // 直接向上游订阅数据,不进行线程切换,切换操作在Observer中进行
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    static final class ObserveOnObserver<T> implements Observer<T>, Runnable {
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            // 这里选把数据放到队列中,增加吞吐量,提高性能
            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            // 在schedule方法里进行线程切换并把数据循环取出
            // 回调给下游,下游会在指定的线程中收到数据
            schedule();
        }
        void schedule() {
            if (this.getAndIncrement() == 0) {
                // 切换线程
                this.worker.schedule(this);
            }
        }
    }
}

observeOn 总结

observeOn 多次生效

不同于 subscribeOn,observeOn 可以有多个而且每个都会生效

  1. subscribeOn 切换的线程可以通过 doOnSubscribe 监听
  2. observeOn 切换的线程可以通过 doOnNext 监听

总结

  1. 创建 Observable 是一层包裹一层,后面的操作符会持有前面操作符对应的 Observable 对象
  2. 订阅时是个逆向的,从最后的操作符订阅
  3. subscribeOn 切换订阅时的线程,不能保证数据发送的线程就在该线程
  4. observeOn 切换数据发送 (onNext) 所在的线程 image.png

Ref

本文由作者按照 CC BY 4.0 进行授权