创建型操作符
创建型操作符
操作符 | 作用 |
---|---|
create | 使用一个函数从头创建一个 Observable |
just | 将一个或多个对象转换成发射这个或这些对象的一个 Observable |
from | 将一个 Iterable、一个 Future 或者一个数组转换成一个 Observable |
defer | 只有当订阅者订阅才创建 Observable,为每个订阅创建一个新的 Observable |
range | 创建一个发射指定范围的整数序列的 Observable |
interval | 创建一个按照给定的时间间隔发射整数序列的 Observable |
timer | 创建一个在给定的延时之后发射单个数据的 Observable |
empty | 创建一个什么都不 做直接通知完成的 Observable |
error | 创建一个什么都不做直接通知错误的 Observable |
never | 创建一个不发射任何数据的 Observable |
create
create 案例
示例 1:未判断是否 isDisposed()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
Disposable disposable = Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
for (int i = 1; i <= 5; i++) {
System.out.println("create next:" + i + " " + Thread.currentThread().getName());
emitter.onNext(i);
try {
Thread.sleep(1000L);
} catch (Exception e) {
e.printStackTrace();
}
}
emitter.onComplete();
}).subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(integer -> System.out.println("-->>subscribe next:" + integer + " " + Thread.currentThread().getName()), throwable -> System.out.println("-->>subscribe error:" + throwable.getMessage() + " " + Thread.currentThread().getName()), () -> System.out.println("-->>subscribe complete." + " " + Thread.currentThread().getName()));
结果:在 IO 线程中 create 中每隔一秒发射一个数据,在 newThread 中消费数据
create next:1 RxCachedThreadScheduler-1 –»subscribe next:1 RxNewThreadScheduler-1 create next:2 RxCachedThreadScheduler-1 –»subscribe next:2 RxNewThreadScheduler-1 create next:3 RxCachedThreadScheduler-1 –»subscribe next:3 RxNewThreadScheduler-1 create next:4 RxCachedThreadScheduler-1 –»subscribe next:4 RxNewThreadScheduler-1 create next:5 RxCachedThreadScheduler-1 –»subscribe next:5 RxNewThreadScheduler-1 –»subscribe complete. RxNewThreadScheduler-1
示例 2:未判断 isDisposed() 的情况,在中途 dispose
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Disposable disposable = Observable
.create((ObservableOnSubscribe<Integer>) emitter -> {
for (int i = 1; i <= 5; i++) {
System.out.println("create next:" + i + " " + Thread.currentThread().getName());
emitter.onNext(i);
try {
Thread.sleep(1000L);
} catch (Exception e) {
e.printStackTrace();
}
}
emitter.onComplete();
}).subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(
integer -> System.out.println("-->>subscribe next:" + integer + " " + Thread.currentThread().getName()),
throwable -> System.out.println("-->>subscribe error:" + throwable.getMessage() + " " + Thread.currentThread().getName()),
() -> System.out.println("-->>subscribe complete." + " " + Thread.currentThread().getName())
);
Disposable d = Observable.timer(2100, TimeUnit.MICROSECONDS).subscribe(aLong -> {
System.out.println("2秒后dispose" + " " + Thread.currentThread().getName());
disposable.dispose();
});
结果:发现 dispose 后,create 还在一直发射数据
create next:1 RxCachedThreadScheduler-1 –»subscribe next:1 RxNewThreadScheduler-1 2 秒后 dispose RxComputationThreadPool-1 create next:2 RxCachedThreadScheduler-1 create next:3 RxCachedThreadScheduler-1 java.lang.InterruptedException: sleep interrupted at java.base/java.lang.Thread.sleep(Native Method) …… create next:4 RxCachedThreadScheduler-1 create next:5 RxCachedThreadScheduler-1
示例 3:判断是否 isDisposed(),避免无效的数据发射
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static void testCreateDisposable() {
Disposable disposable = Observable
.create((ObservableOnSubscribe<Integer>) emitter -> {
for (int i = 1; i <= 5; i++) {
if (!emitter.isDisposed()) {
System.out.println("create next:" + i + " " + Thread.currentThread().getName());
emitter.onNext(i);
}
try {
Thread.sleep(1000L);
} catch (Exception e) {
e.printStackTrace();
}
}
emitter.onComplete();
}).subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(
integer -> System.out.println("-->>subscribe next:" + integer + " " + Thread.currentThread().getName()),
throwable -> System.out.println("-->>subscribe error:" + throwable.getMessage() + " " + Thread.currentThread().getName()),
() -> System.out.println("-->>subscribe complete." + " " + Thread.currentThread().getName())
);
Disposable d = Observable.timer(2100, TimeUnit.MILLISECONDS).subscribe(aLong -> {
System.out.println("2秒后dispose" + " " + Thread.currentThread().getName());
disposable.dispose();
});
}
结果:发射出 3 个数据后,dispose 了,就不再发射数据了
create next:1 RxCachedThreadScheduler-1 –»subscribe next:1 RxNewThreadScheduler-1 create next:2 RxCachedThreadScheduler-1 –»subscribe next:2 RxNewThreadScheduler-1 create next:3 RxCachedThreadScheduler-1 –»subscribe next:3 RxNewThreadScheduler-1 2 秒后 dispose RxComputationThreadPool-1 java.lang.InterruptedException: sleep interrupted
from
RxJava2.x 后,from() 没有了替换成了 fromArray(),fromFuture() 还是存在的
from 可以将其他种类的对象和数据类型转换为 Observable;可以将 Future、Iterator 和数组转换为 Observable;对于 Iterator 和数组,产生的 Observable 会发射 Iterator 或数组的每一项数据。
如果只有一个元素,调用的是 just;如果没有一个元素,调用的是 empty()
元素不能为空,否则 NPE
fromIterable
fromArray
发送一个数据,一个一个发送
实现的 Observable 是 ObservableFromArray
fromFuture
对于 Future,它会发射 Future.get() 方法返回的单个数据
有个重载的 3 个参数的方法,分别指定超时时长和时间单位,如果过了指定的时长,Future 还没有返回一个值,那么这个 Observable 就会发射错误通知并终止。
just
just 操作符介绍
一次可以发送一个或者多个
just 类似于 from,但是 from 会将数组或 Iterator 的数据取出然后逐个发送,而 just 只是简单得原样发射,将数组或 Iterator 当作单个数据。
它可以接受 1~10 个参数,返回一个按参数列表顺序发射这些数据的 Observable。
RxJava2,不能传递 null,否则会 NPE。
just
底层实现是调用的 fromArray()
一个参数的 just 实现是 ObservableJust
2 个参数及以上调用的是 fromArray()
just 注意
just 操作符时,不需要 subscribe 订阅也会立即执行
1
2
3
4
5
6
7
8
9
private static void testJust1() {
System.out.println("from just");
Observable.just(getRandomInteger());
}
public static Integer getRandomInteger() {
System.out.println("generating Integer");
return new Random().nextInt();
}
输出:
1
2
from just
generating Integer
没有进行订阅也执行了打印 “generating Integer”,而 Cold Observable 必须使用 subscribe() 才会生效
just 创建的不是一个 Cold Observable
1
2
3
4
5
6
7
8
9
10
11
12
private static void testJust2() {
System.out.println("from Just");
Observable justObservable = Observable.just(new Random().nextInt());
justObservable.subscribe(System.out::println);
justObservable.subscribe(System.out::println);
System.out.println("\nfrom Callable");
Observable callableObservable = Observable.fromCallable(() -> new Random().nextInt());
callableObservable.subscribe(System.out::println);
callableObservable.subscribe(System.out::println);
}
输出:
1
2
3
4
5
6
7
from Just
-1368455701
-1368455701
from Callable
-1746124833
209078358
- 上述执行结果中 just 操作符创建的 Observable 即使被订阅多次,所产生的值依然保持不变。该值是从 Observable 外部生成的,而 Observable 仅将其存储以供以后使用。
- just 可以立即执行,而 fromCallable 是延迟执行,必须等调用 subscribe 后才执行
just 和 fromCallable 分别调用 subscribeOn() 效果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static void testJust3() {
Observable.just(getRandomInteger("from just"))
.subscribeOn(Schedulers.io())
.subscribe(s -> log("Consume just: " + s));
Observable.fromCallable(() -> getRandomInteger("from callable"))
.subscribeOn(Schedulers.io())
.subscribe(s -> log("Consume fromCallable: " + s));
}
public static Integer getRandomInteger(String prefix) {
log(prefix + " generating Integer");
return new Random().nextInt();
}
public static void log(String msg) {
System.out.println("Current Thread Name:" + Thread.currentThread().getName() + ", " + msg);
}
输出:
1
2
3
4
Current Thread Name:main, from just generating Integer
Current Thread Name:RxCachedThreadScheduler-1, Consume just: -858536206
Current Thread Name:RxCachedThreadScheduler-2, from callable generating Integer
Current Thread Name:RxCachedThreadScheduler-2, Consume fromCallable: -1050044769
- 使用 just 操作符时,getRandomInteger() 函数在 main 函数中运行。而使用 fromCallable 时,getRandomInteger() 函数是在 io 线程中运行
- 因为 Hot Observable 是在订阅之前就创建了 Observable,所以使用 just 操作符后,getRandomInteger() 函数的调用并没有受到 subscribeOn() 的影响;而 fromCallable 是 Cold Observable 是发生订阅之后创建 Observable 的
defer
直到有观察者订阅时才创建 Observable,并且为每个观察者创建一个全新的 Observable。
defer 操作符会一直等待直到有观察者订阅它,然后它使用 Observable 工厂方法生成一个 Observable。它对每个订阅者都这样做,因此尽管每个订阅者都以为自己订阅的是同一个 Observable,但事实上每个订阅者获取的是他们自己单独的数据序列。
在某些情况下,直到最后一分钟(订阅发生时)才生成 Observable,以确保 Observable 包含最新的数据。
- 只有当订阅者订阅时才创建 Observable,为每个订阅者创建一个新的 Observable;默认的 Observable 获取时就创建了。
- 内部通过 ObservableDefer 在订阅时调用 Callable 的 call 方法创建 Observable
案例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public final class RxJava2DeferTest {
@Test
public void testDefer() {
SomeType instance = new SomeType();
Observable<String> value = instance.valueObservableByDefer();
instance.setValue("Some Value");
value.subscribe(System.out::println); // Some Value
}
@Test
public void testNoDefer() {
SomeType instance = new SomeType();
Observable<String> value = instance.valueObservable();
instance.setValue("Some Value");
value.subscribe(System.out::println); // default
}
static class SomeType {
private String value = "default";
public void setValue(String value) {
this.value = value;
}
public Observable<String> valueObservable() {
return Observable.just(value);
}
public Observable<String> valueObservableByDefer() {
return Observable.defer(() -> Observable.just(value));
}
}
}
defer 原理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public final class ObservableDefer<T> extends Observable<T> {
final Callable<? extends ObservableSource<? extends T>> supplier;
public ObservableDefer(Callable<? extends ObservableSource<? extends T>> supplier) {
this.supplier = supplier;
}
@Override
public void subscribeActual(Observer<? super T> s) {
ObservableSource<? extends T> pub;
try {
pub = ObjectHelper.requireNonNull(supplier.call(), "null ObservableSource supplied");
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
EmptyDisposable.error(t, s);
return;
}
pub.subscribe(s);
}
}
ObservableDefer 保存了一个 Callable,在 ObservableDefer 被 subscribe 时,会将真正的 Observable 给 subscribe
timer
创建一个 Observable,它在一个给定的延迟后发射一个特殊的值。
timer 操作符创建一个在给定时间段之后一个特殊值的 Observable。默认调度器在computation。
1
2
3
4
5
6
7
8
9
10
11
Observable.timer(2, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
LogUtil.i(TAG, "发射数据:" + System.currentTimeMillis());
LogUtil.i(TAG, "timer :" + aLong);
sb.append(aLong + "\n");
mTvOperatorResult1.setText(sb.toString());
}
});
interval
创建一个按固定时间间隔发射整数序列的 Observable。
interval 操作符返回一个 Observable,它按固定的时间间隔发射一个无限递增的整数序列。
- interval 默认在 computation 调度器上执行,线程数较少,容易出现阻塞
- interval 的 subscribeOn 会无效,需要调用
interval(long period, TimeUnit unit, Scheduler scheduler)
来指定线程
1
2
3
4
5
6
7
8
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
LogUtil.i(TAG, "interval :" + aLong);
sb.append(aLong + "\n");
}
});
interval 的坑
- interval 的默认线程是 computation,有的华为手机不会调用,最好是换个线程;computation 线程数量是有限的,用完了的话容易出现阻塞导致任务不执行
- interval 操作符的线程调度看起来是不受 subscribeOn 影响的,里面默认指定了 computation 线程池;