RxJava原理
create 分析
create 无其他操作符,无切换线程原理分析
看看 create 的使用案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("hacket");
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
System.out.println("onNext:" + s);
}
@Override
public void onError(@NonNull Throwable e) {
e.printStackTrace();
System.out.println("onError:" + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
}
输出:
1
2
3
onSubscribe
onNext:hacket
onComplete
要分析原理,我们分 2 部分来看看:Observable 的创建和 subscribe
首先看 Observable 的创建部分,Observable.create:
1
2
3
4
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null"); // 判断source不为null
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
- 判断 source 不为 null
- RxJavaPlugins.onAssembly,RxJava 提供的全局 hook 操作符的入口,如果设置了保存在 onObservableAssembly
- new 了一个 ObservableCreate,并把 source 传递进去
source 是什么呢?
1
2
3
public interface ObservableOnSubscribe<T> {
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
ObservableOnSubscribe 是一个拥有 subscribe 的接口,接收一个 ObservableEmitter 实例
下面看看 ObservableCreate:
1
2
3
4
5
6
7
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
// ...
}
这里只是将 source 作为一个成员变量保存在 ObservableCreate 类中
到这里可以看到,create 操作就是创建一个 ObservableCreate 对象。
下面看 subscribe 订阅,从上面可以知道,返回的 Observable 就是一个 ObservableCreate 对象,那么 subscribe 也是调用的 ObservableCreate 的 subscribe() 方法,而 ObservableCreate 又是继承 Observable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Observable
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
// 全局hook onSubscribe
observer = RxJavaPlugins.onSubscribe(this, observer)
// 最后调用subscribeActual,这个方式是个抽象的
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
subscribe 做了如下操作:
- 回调全局的 onObservableSubscribe 并判断返回的 Observer 部位 null
- 调用抽象方法 subscribeActual,这个方法由具体子类实现,这里是 ObservableCreate
最终调用的是 ObservableCreate 的 subscribeActual:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
// CreateEmitter
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 调用Observer的onSubscribe,传递parent
observer.onSubscribe(parent);
try {
source.subscribe(parent); // 就是我们代码写的匿名内部类的subscribe方法
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
// ...
}
}
- 创建了一个 CreateEmitter
- 回调 Observer 的 onSubscribe(Disposable) 方法,代表订阅成功了
- 调用 create 代码中写的匿名内部类的 subscribe 方法
在 ObservableOnSubscribe 的 subscribe 中我们调用了:
1
2
emitter.onNext("hacket");
emitter.onComplete();
其实就是调用了上面 CreateEmitter 的 onNext 和 onComplete 方法,最后调用的是 Observer 的 onNext 和 onComplete 方法。
RxJava 原理分析
RxJava 原理 - 简单的链式调用(无线程切换)
示例代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
System.out.println("subscribe > " + Thread.currentThread().getName());
emitter.onNext("test");
emitter.onComplete();
}
}).flatMap(new Function<String, ObservableSource<String>>() {
public ObservableSource<String> apply(@NonNull String s) throws Exception {
return Observable.just(s);
}
}).map(new Function<String, Integer>() {
@Override
public Integer apply(@NonNull String s) throws Exception {
return 0;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("onSubscribe > " + Thread.currentThread().getName());
}
@Override
public void onNext(@NonNull Integer integer) {
System.out.println("onNext > " + Thread.currentThread().getName());
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("onError > " + Thread.currentThread().getName());
}
@Override
public void onComplete() {
System.out.println("onComplete > " + Thread.currentThread().getName());
}
});
输出:
1
2
3
4
onSubscribe > main
subscribe > main
onNext > main
onComplete > main
数据源的包裹封装(正向)
正向数据源包裹封装:
1
ObservableOnSubscribe → ObservableCreate → ObservableFlatMap → ObservableMap
create(ObservableCreate)
先看 Observable.create,new 一个 ObservableCreate,保存 source(类型 ObservableOnSubscribe)为成员变量
1
2
3
4
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
ObservableCreate 内部包含一个类型为 ObservableOnSubscribe<T>
的 source 变量,source 就是上面 create 操作符 new 出来的匿名内部类。现在看看 ObservableCreate:
1
2
3
4
5
6
7
8
9
10
11
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
// ...
}
// ...
}
flatMap(ObservableFlatMap)
接下来看 flatMap,new 一个 ObservableFlatMap,
1
2
3
4
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) {
// ...
return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
调用 flatMap 的为前面 create 创建出来的 ObservableCreate。现在看看 ObservableFlatMap:
1
2
3
4
5
6
7
8
9
public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
// ...
public ObservableFlatMap(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends U>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) {
super(source);
this.mapper = mapper;
// ...
}
}
ObservableFlatMap 内部持有一个类型为 ObservableSource<T>
的 source 变量,而该 source 则是上一步中的 ObservableCreate 实例。ObservableFlatMap 内部还持有一个类型为 Function 的 mapper,这个就是上面代码 flatMap 的匿名内部类 Function<String,ObservableSource<String>>
。
map(ObservableMap)
最后是 map 操作符,new 了一个 ObservableMap
1
2
3
4
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
看看 ObservableMap:
1
2
3
4
5
6
7
8
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
// ...
}
看看 ObservableMap 内部持有一个类型为 ObservableSource<T>
的 source 变量,而该 source 则是上一步中的 ObservableFlatMap 实例。ObservableMap 内部还持有了一个类型为 Function 的 function,这个就是上面代码 map 的匿名内部类 Function<String,Integer>
。
到此,数据源的包裹封包完毕,下面是 subscribe 订阅部分。
小结
- 操作符 xxx,都存在着对应的
ObservableXxx
类,如 create 对应 ObservableCreate,map 对应 ObservableMap,flapMap 对应 ObservableFlatMap - create→flatMap→map 层层封装包裹;后面的操作符,持有前面操作符对应的实例;如 ObservableMap 持有的 source 为 ObservableCreate,ObservableMap 持有的 source 为 ObservableFlatMap
- 每个操作符对应的类都持有了各自的功能,如 create 是
ObservableOnSubscribe<T>
,被 Observer 订阅后,通过 ObservableEmitter 发送数据;flatMap 是Function<? super T, ? extends ObservableSource<? extends U>>
,输入为 T,输出为ObservableSource<U>
的 Function;map 是Function<? super T, ? extends U>
,输入为 T,输出为 U 的 Function
订阅数据源(逆向)subscribe
逆向订阅流程:
1
2
3
4
ObservableMap.subscribe(Observer) → // .subscribe(Observer)
ObservableFlatMap.subscribe(MapObserver) → // .map()
ObservableCreate.subscribe(MergerObserver) → // .flatMap
ObservableOnSubscribe.subscribe(CreateEmitter(MergerObserver)) // .create
以上的代码调用并没有出发数据的流转,只有当我们调用 subscribe
时才真正触发了 RxJava 的数据流。而调用 subscribe 的 Observable 是最后的 map 操作符对应的 ObservableMap。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
// 发生订阅的核心方法,这是一个抽象方法,由具体操作符对应的ObservableXxx实现
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
// ...
throw npe;
}
}
订阅调用的是 subscribeActual,执行 subscribeActual 的对象其实是 ObservableMap。
ObservableMap#subscribeActual
上面示例中调用 subscribe 后,调用的就是 ObservableMap#subscribeActual()
,现在看看 ObservableMap 的 subscribeActual:
1
2
3
4
5
6
7
8
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
// ...
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
// ...
}
- source 为前面分析的 ObservableFlatMap
- new 了一个 MapObserver,t 为上面示例代码中一个 Observer 匿名内部类,真正的 Observer;function 前面已经分析
- 调用了 subscribe,Observer 为 MapObserver,最终又调用到了 FlatMapObservable 的 subscribeActual
其实相当于 ObservableFlatMap.subscribe(MapObserver)
ObservableFlatMap#subscribeActual
现在看看 ObservableFlatMap#subscribeActual
:
1
2
3
4
5
6
7
8
public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
// ...
public void subscribeActual(Observer<? super U> t) {
// ...
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
// ...
}
- source 为 ObservableCreate
- t 为 MapObserver
- 创建了一个 MergeObserver
其实相当于 ObservableCreate.subscribe(MergeObserver)
ObservableCreate#subscribeActual
现在看 ObservableCreate#subscribeActual
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
}
- source 为 ObservableOnSubscribe,即传递给 create 的匿名内部类
- observer 为 MergeObserver
相当于 ObservableOnSubscribe.subscribe(MergeObserver)
触发数据源产生原始数据,数据流转 onNext/onComplete
当订阅发生在最顶层时,也就是 ObservableCreate 中的 subscribeActual ,此时触发了数据源的产生,通过 emitter 发射数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public final class ObservableCreate<T> extends Observable<T> {
// ...
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent); // 此时触发了onSubscribe回调
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
// ...
}
调用 source.subscribe(parent),其实就是调用上面示例中的匿名内部类 ObservableOnSubscribe
1
2
3
4
5
6
7
8
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
System.out.println("subscribe > " + Thread.currentThread().getName());
emitter.onNext("test");
emitter.onComplete();
}
}
再来看 CreateEmitter 的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
// ...
if (!isDisposed()) {
observer.onNext(t); // 向下层分发数据
}
}
// ...
}
根据我们上面的分析 CreateEmitter 中持有的 observer 是 FlatMapObserver 的实例,而 FlatMapObserver 调用 onNext 时,又会调用 MapObserver 的 onNext ,依次调用至我们自己实现的观察者的 onNext 处理数据,此时数据流转完毕。
小结
操作符
对应产生的被观察者和观察者命名规则很有规律,比如说被观察者的命名Observable + 操作符
,例如 ObservableMap = Observable + map;观察者命名大多遵循操作符 + Observer
,例如 FlatMapObserver = flatMap + Observer。- 一个是按照代码顺序的操作符产生了一个一层层的数据源包裹(蓝色虚线的流程部分)
- 另外一个是在逆向订阅时,将观察者按照订阅顺序打包成一个一层层的观察者包裹(上部分的红色流程部分)
线程切换,subscribeOn,无 observeOn
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
new Thread() {
@Override
public void run() {
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
System.out.println("subscribe > " + Thread.currentThread().getName());
emitter.onNext("test");
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io())
.map(new Function<String, Integer>() {
@Override
public Integer apply(@NonNull String s) throws Exception {
return 1;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("onSubscribe > " + Thread.currentThread().getName());
}
@Override
public void onNext(@NonNull Integer integer) {
System.out.println("onNext > " + integer + " " + Thread.currentThread().getName());
}
@Override
public void onError(@NonNull Throwable e) {
e.printStackTrace();
System.out.println("onError > " + Thread.currentThread().getName());
}
@Override
public void onComplete() {
System.out.println("onComplete > " + Thread.currentThread().getName());
}
});
}
}.start();
输出:
1
2
3
4
onSubscribe > Thread-0
subscribe > RxCachedThreadScheduler-1
onNext > 1 main
onComplete > main
数据源的包裹封装(正向)
1
ObservableOnSubscribe → ObservableCreate → ObservableSubscribeOn → ObservableMap → ObservableObserveOn
订阅数据源(逆向)subscribe
单个 subscribeOn 示例 1,subscribeOn 在最后
1
2
3
4
Observable.create()
.map()
.subscribeOn(Schedulers.newThread())
.subscribe()
订阅流程:
1
2
3
4
ObservableSubscribeOn.subscribe(Observer) ↓ // .subscribe() 在当前线程
ObservableMap.subscribe(SubscribeOnObserver) ↓ // subscribeOn 这里发生了线程切换,上游的订阅全部发生在该线程
ObservableCreate.subscribe(MapObserver) ↓ // .map()
ObservableOnSubscribe.subscribe(CreateEmitter(MapObserver)) // .create()
可以看到,在 subscribeOn 调用后,上游的 map 和 create 都是订阅在 newThread 线程。后续的事件发送都会在该线程
单个 subscribeOn 示例 2,subscribeOn 在中间
1
2
3
4
5
6
7
thread {
Observable.create()
.map() // 1
.subscribeOn(Schedulers.newThread())
.map() // 2
.subscribe()
}
输出:
1
2
3
4
5
6
onSubscribe > Thread-0
subscribe > RxNewThreadScheduler-1
map1 > RxNewThreadScheduler-1
map2 > RxNewThreadScheduler-1
onNext > 2 RxNewThreadScheduler-1
onComplete > RxNewThreadScheduler-1
- 数据源包裹封装
1
ObservableOnSubscribe → ObservableCreate → ObservableMap → ObservableSubscribeOn → ObservableMap
- 定义数据源
1
2
3
4
5
ObservableMap.subscribe(Observer) ↓ // .subscribe()
ObservableSubscribeOn.subscribe(MapObserver) ↓ // .map()
ObservableMap.subscribe(SubscribeOnObserver) ↓ // subscribeOn() 这里发生了线程切换
ObservableCreate.subscribe(MapObserver) ↓ // .map()
ObservableOnSubscribe.subscribe(CreateEmitter(MapObserver)) // .create()
可以看到,在 subscribeOn 调用后,上游的 map 和 create 都是订阅在 newThread 线程。后续的事件发送都会在该线程。可以看到 subscribeOn 写在哪里无所谓的。
数据流转 onNext/onComplete
create 数据源在什么线程,后面调用的 onNext 都是在这个线程中。
subscribeOn 总结
subscribeOn 只生效一次?为什么 subscribeOn 只有第一次有效?
subscribeOn 通过切换订阅线程,改变 Observable.create 所在线程,从而影响数据的发射线程。由于订阅过程自下而上,所以 Observable.create 只受最近一次 subscribeOn 影响,当调用链中有多个 subscribeOn 时只有第一个有效。其他 subscibeOn 仍然可以影响其上游的 doOnSubscribe
的执行线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
fun test() {
Observable.create<Unit> { emitter ->
log("onSubscribe")
emitter.onNext(Unit)
emitter.onComplete()
}.subscribeOn(namedScheduler("1 - subscribeOn"))
.doOnSubscribe { log("1 - doOnSubscribe") }
.subscribeOn(namedScheduler("2 - subscribeOn"))
.doOnSubscribe { log("2 - doOnSubscribe") }
.doOnNext { log("onNext") }
.test().awaitTerminalEvent() // Wait until observable completes
}
类似的问题:
- subscribeOn 是离数据源近的有效还是远的有效?
订阅是反向订阅,离数据源最近的有效,最近的会覆盖掉后面的 subscribeOn
- 为什么 subscribeOn 写在 map 前面和后面都是一样?
subscribeOn 只有一次有效,写哪里都一样
- 为什么 subscribeOn 写在 observeOn 也可以?
subscribeOn 用来决定订阅线程,但这并不意味着上游数据一定来自此线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Test
fun test() {
val observable = Observable.create<Int> { emitter ->
log("onSubscribe")
thread(name = "Main thread", isDaemon = false) {
log("1 - emitting"); emitter.onNext(1)
log("2 - emitting"); emitter.onNext(2)
log("3 - emitting"); emitter.onNext(3)
emitter.onComplete()
}
}
observable
.subscribeOn(Schedulers.computation())
.doOnNext { log("$it - after subscribeOn") }
.test().awaitTerminalEvent() // Wait until observable completes
}
subscribeOn 支持决定订阅线程,但这并不意味着上游数据一定来自此线程,这是因为发送数据 onNext/onComplete/onError 也在其他线程中运行。
对于 PublishSubject 无效
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Test
fun test() {
val subject = PublishSubject.create<Int>()
val observer1 = subject
.subscribeOn(Schedulers.io())
.doOnNext { log("$it - I want this happen on an IO thread") }
.test()
val observer2 = subject
.subscribeOn(Schedulers.newThread())
.doOnNext { log("$it - I want this happen on a new thread") }
.test()
sleep(10);
subject.onNext(1)
subject.onNext(2)
subject.onNext(3)
subject.onComplete()
observer1.awaitTerminalEvent()
observer2.awaitTerminalEvent()
}
对于 PublishSubject 来说,上游数据来自哪个线程是在 onNext 时决定的,所以对一个 PublishSubject 使用使用 subscribeOn 没有意义。
对于 Observable.just() 无效
1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static void testJust() {
Observable.just(getJustData())
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("accept :" + s + " " + Thread.currentThread().getName());
}
});
}
private static String getJustData() {
System.out.println("getJustData :" + Thread.currentThread().getName());
return "just data";
}
输出:
1
2
getJustData :main
accept :just data RxCachedThreadScheduler-1
如上,getJustData() 放在 just 中显然是不合适的。just() 在当前线程立即执行,因此不受 subscribeOn 影响,应该修改如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private static void testJust2() {
Observable
.defer(new Callable<ObservableSource<String>>() {
@Override
public ObservableSource<String> call() throws Exception {
return Observable.just(getJustData());
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("accept :" + s + " " + Thread.currentThread().getName());
}
});
// 或者
Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
return getJustData();
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("accept :" + s + " " + Thread.currentThread().getName());
}
});
}
输出:
1
2
getJustData :RxCachedThreadScheduler-1
accept :just data RxCachedThreadScheduler-1
使用 flatMap 处理并发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private static void testFlatMap() {
List<String> list = new ArrayList<>();
list.add("id1");
list.add("id2");
list.add("id3");
Observable.fromIterable(list)
.flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull String s) throws Exception {
System.out.println("flatMap apply " + s + "," + Thread.currentThread().getName());
return toUpperCase(s);
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("subscribe accept " + s + ", " + Thread.currentThread().getName());
}
});
}
private static Observable<String> toUpperCase(String data) {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
System.out.println("toUpperCase create " + data + "," + Thread.currentThread().getName());
emitter.onNext(data.toUpperCase());
}
});
}
输出:
1
2
3
4
5
6
7
8
9
flatMap apply id1,RxCachedThreadScheduler-1
toUpperCase create id1,RxCachedThreadScheduler-1
subscribe accept ID1, RxCachedThreadScheduler-1
flatMap apply id2,RxCachedThreadScheduler-1
toUpperCase create id2,RxCachedThreadScheduler-1
subscribe accept ID2, RxCachedThreadScheduler-1
flatMap apply id3,RxCachedThreadScheduler-1
toUpperCase create id3,RxCachedThreadScheduler-1
subscribe accept ID3, RxCachedThreadScheduler-1
如果我们希望多个 toUpperCase(data) 并发执行,上述写法是错误的。
subscribeOn 决定了 flatMap 上游线程,flatMap 返回多个 Observable 的订阅都是发生在此线程,多个 toUpperCase 只能运行在单一线程,无法实现并行。
想要达到并行执行效果,需要修改如下:
1
2
3
4
5
6
7
8
9
private static Observable<String> toUpperCase(String data) {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
System.out.println("toUpperCase create " + data + "," + Thread.currentThread().getName());
emitter.onNext(data.toUpperCase());
}
}).subscribeOn(Schedulers.newThread());
}
线程切换 observeOn
示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
private static void testObserveOn() {
new Thread() {
@Override
public void run() {
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
System.out.println("create subscribe > " + Thread.currentThread().getName());
new Thread() {
@Override
public void run() {
super.run();
emitter.onNext("create test");
emitter.onComplete();
}
}.start();
}
})
.subscribeOn(Schedulers.io())
.map(s -> {
System.out.println("map1 > " + Thread.currentThread().getName());
return 1;
})
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.computation())
.map(s -> {
System.out.println("map2 > " + Thread.currentThread().getName());
return 2;
})
.observeOn(Schedulers.single())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("onSubscribe > " + Thread.currentThread().getName());
}
@Override
public void onNext(@NonNull Integer integer) {
System.out.println("onNext > " + integer + " " + Thread.currentThread().getName());
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("onError > " + Thread.currentThread().getName());
}
@Override
public void onComplete() {
System.out.println("onComplete > " + Thread.currentThread().getName());
}
});
}
}.start();
}
输出:
1
2
3
4
5
6
onSubscribe > Thread-0
create subscribe > RxCachedThreadScheduler-1 // .subscribeOn(IO)
map1 > Thread-1 // new Thread
map2 > RxComputationThreadPool-1 // .observeOn(Schedulers.computation())
onNext > 2 RxSingleScheduler-1 // .observeOn(Schedulers.single())
onComplete > RxSingleScheduler-1 // .observeOn(Schedulers.single())
代码简写:
1
2
3
4
5
6
7
8
Observable.create()
.subscribeOn(Schedulers.io())
.map()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.computation())
.map()
.observeOn(Schedulers.single())
.subscribe()
订阅流程:
1
2
3
4
5
6
7
8
ObservableObserveOn.subscribe(Observer) ↓ // .subscribe()
ObservableMap.subscribe(ObserveOnObserver) ↓ // .observeOn(Schedulers.single()),Observer中onNext发送数据时实现了线程切换
ObservableObserveOn.subscribe(MapObserver) ↓ // .map()
ObservableSubscribeOn.subscribe(ObserveOnObserver) ↓ // .observeOn(Schedulers.computation()),MapObserver中onNext发送数据时实现了线程切换
ObservableMap.subscribe(SubscribeOnObserver) ↓ // .subscribeOn(Schedulers.newThread())
ObservableSubscribeOn.subscribe(MapObserver) ↓ // .map()
ObservableCreate.subscribe(SubscribeOnObserver) ↓ // .subscribeOn(Schedulers.io())
ObservableOnSubscribe.subscribe(CreateEmitter(SubscribeOnObserver)) // .create()
原理分析:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//ObservableObserveOn.java
final class ObservableObserveOn extends Observable<T> {
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
// 直接向上游订阅数据,不进行线程切换,切换操作在Observer中进行
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
static final class ObserveOnObserver<T> implements Observer<T>, Runnable {
@Override
public void onNext(T t) {
if (done) {
return;
}
// 这里选把数据放到队列中,增加吞吐量,提高性能
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
// 在schedule方法里进行线程切换并把数据循环取出
// 回调给下游,下游会在指定的线程中收到数据
schedule();
}
void schedule() {
if (this.getAndIncrement() == 0) {
// 切换线程
this.worker.schedule(this);
}
}
}
}
observeOn 总结
observeOn 多次生效
不同于 subscribeOn,observeOn 可以有多个而且每个都会生效
- subscribeOn 切换的线程可以通过 doOnSubscribe 监听
- observeOn 切换的线程可以通过 doOnNext 监听
总结
- 创建 Observable 是一层包裹一层,后面的操作符会持有前面操作符对应的 Observable 对象
- 订阅时是个逆向的,从最后的操作符订阅
- subscribeOn 切换订阅时的线程,不能保证数据发送的线程就在该线程
- observeOn 切换数据发送 (onNext) 所在的线程
Ref
- 两张图彻底理解 RxJava2 的核心原理
http://solart.cc/2020/06/16/understand_rxjava2/