文章

创建型操作符

创建型操作符

创建型操作符

操作符作用
create使用一个函数从头创建一个 Observable
just将一个或多个对象转换成发射这个或这些对象的一个 Observable
from将一个 Iterable、一个 Future 或者一个数组转换成一个 Observable
defer只有当订阅者订阅才创建 Observable,为每个订阅创建一个新的 Observable
range创建一个发射指定范围的整数序列的 Observable
interval创建一个按照给定的时间间隔发射整数序列的 Observable
timer创建一个在给定的延时之后发射单个数据的 Observable
empty创建一个什么都不 做直接通知完成的 Observable
error创建一个什么都不做直接通知错误的 Observable
never创建一个不发射任何数据的 Observable

create

create 案例

示例 1:未判断是否 isDisposed()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Disposable disposable = Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
    for (int i = 1; i <= 5; i++) {
        System.out.println("create next:" + i + " " + Thread.currentThread().getName());
        emitter.onNext(i);
        try {
            Thread.sleep(1000L);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    emitter.onComplete();
}).subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(integer -> System.out.println("-->>subscribe next:" + integer + " " + Thread.currentThread().getName()), throwable -> System.out.println("-->>subscribe error:" + throwable.getMessage() + " " + Thread.currentThread().getName()), () -> System.out.println("-->>subscribe complete." + " " + Thread.currentThread().getName()));

结果:在 IO 线程中 create 中每隔一秒发射一个数据,在 newThread 中消费数据

create next:1 RxCachedThreadScheduler-1 –»subscribe next:1 RxNewThreadScheduler-1 create next:2 RxCachedThreadScheduler-1 –»subscribe next:2 RxNewThreadScheduler-1 create next:3 RxCachedThreadScheduler-1 –»subscribe next:3 RxNewThreadScheduler-1 create next:4 RxCachedThreadScheduler-1 –»subscribe next:4 RxNewThreadScheduler-1 create next:5 RxCachedThreadScheduler-1 –»subscribe next:5 RxNewThreadScheduler-1 –»subscribe complete. RxNewThreadScheduler-1

示例 2:未判断 isDisposed() 的情况,在中途 dispose

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Disposable disposable = Observable
    .create((ObservableOnSubscribe<Integer>) emitter -> {
        for (int i = 1; i <= 5; i++) {
            System.out.println("create next:" + i + " " + Thread.currentThread().getName());
            emitter.onNext(i);
            try {
                Thread.sleep(1000L);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        emitter.onComplete();
    }).subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread())
    .subscribe(
        integer -> System.out.println("-->>subscribe next:" + integer + " " + Thread.currentThread().getName()),
        throwable -> System.out.println("-->>subscribe error:" + throwable.getMessage() + " " + Thread.currentThread().getName()),
        () -> System.out.println("-->>subscribe complete." + " " + Thread.currentThread().getName())
    );
Disposable d = Observable.timer(2100, TimeUnit.MICROSECONDS).subscribe(aLong -> {
    System.out.println("2秒后dispose" + " " + Thread.currentThread().getName());
    disposable.dispose();
});

结果:发现 dispose 后,create 还在一直发射数据

create next:1 RxCachedThreadScheduler-1 –»subscribe next:1 RxNewThreadScheduler-1 2 秒后 dispose RxComputationThreadPool-1 create next:2 RxCachedThreadScheduler-1 create next:3 RxCachedThreadScheduler-1 java.lang.InterruptedException: sleep interrupted at java.base/java.lang.Thread.sleep(Native Method) …… create next:4 RxCachedThreadScheduler-1 create next:5 RxCachedThreadScheduler-1

示例 3:判断是否 isDisposed(),避免无效的数据发射

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static void testCreateDisposable() {
    Disposable disposable = Observable
            .create((ObservableOnSubscribe<Integer>) emitter -> {
                for (int i = 1; i <= 5; i++) {
                    if (!emitter.isDisposed()) {
                        System.out.println("create next:" + i + " " + Thread.currentThread().getName());
                        emitter.onNext(i);
                    }
                    try {
                        Thread.sleep(1000L);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                emitter.onComplete();
            }).subscribeOn(Schedulers.io())
        	.observeOn(Schedulers.newThread())
            .subscribe(
                    integer -> System.out.println("-->>subscribe next:" + integer + " " + Thread.currentThread().getName()),
                    throwable -> System.out.println("-->>subscribe error:" + throwable.getMessage() + " " + Thread.currentThread().getName()),
                    () -> System.out.println("-->>subscribe complete." + " " + Thread.currentThread().getName())
            );
    Disposable d = Observable.timer(2100, TimeUnit.MILLISECONDS).subscribe(aLong -> {
        System.out.println("2秒后dispose" + " " + Thread.currentThread().getName());
        disposable.dispose();
    });
}

结果:发射出 3 个数据后,dispose 了,就不再发射数据了

create next:1 RxCachedThreadScheduler-1 –»subscribe next:1 RxNewThreadScheduler-1 create next:2 RxCachedThreadScheduler-1 –»subscribe next:2 RxNewThreadScheduler-1 create next:3 RxCachedThreadScheduler-1 –»subscribe next:3 RxNewThreadScheduler-1 2 秒后 dispose RxComputationThreadPool-1 java.lang.InterruptedException: sleep interrupted

from

RxJava2.x 后,from() 没有了替换成了 fromArray(),fromFuture() 还是存在的
from 可以将其他种类的对象和数据类型转换为 Observable;可以将 Future、Iterator 和数组转换为 Observable;对于 Iterator 和数组,产生的 Observable 会发射 Iterator 或数组的每一项数据。
如果只有一个元素,调用的是 just;如果没有一个元素,调用的是 empty()
元素不能为空,否则 NPE

fromIterable

fromArray

发送一个数据,一个一个发送
实现的 Observable 是 ObservableFromArray

fromFuture

对于 Future,它会发射 Future.get() 方法返回的单个数据

有个重载的 3 个参数的方法,分别指定超时时长和时间单位,如果过了指定的时长,Future 还没有返回一个值,那么这个 Observable 就会发射错误通知并终止。

just

just 操作符介绍

一次可以发送一个或者多个
just 类似于 from,但是 from 会将数组或 Iterator 的数据取出然后逐个发送,而 just 只是简单得原样发射,将数组或 Iterator 当作单个数据。
它可以接受 1~10 个参数,返回一个按参数列表顺序发射这些数据的 Observable。
RxJava2,不能传递 null,否则会 NPE。

just

底层实现是调用的 fromArray()
一个参数的 just 实现是 ObservableJust
2 个参数及以上调用的是 fromArray()

just 注意

just 操作符时,不需要 subscribe 订阅也会立即执行

1
2
3
4
5
6
7
8
9
private static void testJust1() {
    System.out.println("from just");
    Observable.just(getRandomInteger());
}

public static Integer getRandomInteger() {
    System.out.println("generating Integer");
    return new Random().nextInt();
}

输出:

1
2
from just
generating Integer

没有进行订阅也执行了打印 “generating Integer”,而 Cold Observable 必须使用 subscribe() 才会生效

just 创建的不是一个 Cold Observable

1
2
3
4
5
6
7
8
9
10
11
12
private static void testJust2() {
    System.out.println("from Just");
    Observable justObservable = Observable.just(new Random().nextInt());
    justObservable.subscribe(System.out::println);
    justObservable.subscribe(System.out::println);

    System.out.println("\nfrom Callable");

    Observable callableObservable = Observable.fromCallable(() -> new Random().nextInt());
    callableObservable.subscribe(System.out::println);
    callableObservable.subscribe(System.out::println);
}

输出:

1
2
3
4
5
6
7
from Just
-1368455701
-1368455701

from Callable
-1746124833
209078358
  • 上述执行结果中 just 操作符创建的 Observable 即使被订阅多次,所产生的值依然保持不变。该值是从 Observable 外部生成的,而 Observable 仅将其存储以供以后使用。
  • just 可以立即执行,而 fromCallable 是延迟执行,必须等调用 subscribe 后才执行

just 和 fromCallable 分别调用 subscribeOn() 效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static void testJust3() {
    Observable.just(getRandomInteger("from just"))
            .subscribeOn(Schedulers.io())
            .subscribe(s -> log("Consume just: " + s));

    Observable.fromCallable(() -> getRandomInteger("from callable"))
            .subscribeOn(Schedulers.io())
            .subscribe(s -> log("Consume fromCallable: " + s));
}

public static Integer getRandomInteger(String prefix) {
    log(prefix + " generating Integer");
    return new Random().nextInt();
}

public static void log(String msg) {
    System.out.println("Current Thread Name:" + Thread.currentThread().getName() + ", " + msg);
}

输出:

1
2
3
4
Current Thread Name:main, from just generating Integer
Current Thread Name:RxCachedThreadScheduler-1, Consume just: -858536206
Current Thread Name:RxCachedThreadScheduler-2, from callable generating Integer
Current Thread Name:RxCachedThreadScheduler-2, Consume fromCallable: -1050044769
  • 使用 just 操作符时,getRandomInteger() 函数在 main 函数中运行。而使用 fromCallable 时,getRandomInteger() 函数是在 io 线程中运行
  • 因为 Hot Observable 是在订阅之前就创建了 Observable,所以使用 just 操作符后,getRandomInteger() 函数的调用并没有受到 subscribeOn() 的影响;而 fromCallable 是 Cold Observable 是发生订阅之后创建 Observable 的

defer

直到有观察者订阅时才创建 Observable,并且为每个观察者创建一个全新的 Observable。

defer 操作符会一直等待直到有观察者订阅它,然后它使用 Observable 工厂方法生成一个 Observable。它对每个订阅者都这样做,因此尽管每个订阅者都以为自己订阅的是同一个 Observable,但事实上每个订阅者获取的是他们自己单独的数据序列。

在某些情况下,直到最后一分钟(订阅发生时)才生成 Observable,以确保 Observable 包含最新的数据。

  1. 只有当订阅者订阅时才创建 Observable,为每个订阅者创建一个新的 Observable;默认的 Observable 获取时就创建了。
  2. 内部通过 ObservableDefer 在订阅时调用 Callable 的 call 方法创建 Observable

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public final class RxJava2DeferTest {
    @Test
    public void testDefer() {
        SomeType instance = new SomeType();
        Observable<String> value = instance.valueObservableByDefer();
        instance.setValue("Some Value");
        value.subscribe(System.out::println); // Some Value
    }
    @Test
    public void testNoDefer() {
        SomeType instance = new SomeType();
        Observable<String> value = instance.valueObservable();
        instance.setValue("Some Value");
        value.subscribe(System.out::println); // default
    }
    static class SomeType {
        private String value = "default";
        public void setValue(String value) {
            this.value = value;
        }
        public Observable<String> valueObservable() {
            return Observable.just(value);
        }
        public Observable<String> valueObservableByDefer() {
            return Observable.defer(() -> Observable.just(value));
        }
    }
}

defer 原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public final class ObservableDefer<T> extends Observable<T> {
    final Callable<? extends ObservableSource<? extends T>> supplier;
    public ObservableDefer(Callable<? extends ObservableSource<? extends T>> supplier) {
        this.supplier = supplier;
    }
    @Override
    public void subscribeActual(Observer<? super T> s) {
        ObservableSource<? extends T> pub;
        try {
            pub = ObjectHelper.requireNonNull(supplier.call(), "null ObservableSource supplied");
        } catch (Throwable t) {
            Exceptions.throwIfFatal(t);
            EmptyDisposable.error(t, s);
            return;
        }
        pub.subscribe(s);
    }
}

ObservableDefer 保存了一个 Callable,在 ObservableDefer 被 subscribe 时,会将真正的 Observable 给 subscribe

timer


创建一个 Observable,它在一个给定的延迟后发射一个特殊的值。
timer 操作符创建一个在给定时间段之后一个特殊值的 Observable。默认调度器在computation

1
2
3
4
5
6
7
8
9
10
11
Observable.timer(2, TimeUnit.SECONDS)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Consumer<Long>() {
        @Override
        public void accept(Long aLong) throws Exception {
            LogUtil.i(TAG, "发射数据:" + System.currentTimeMillis());
            LogUtil.i(TAG, "timer :" + aLong);
            sb.append(aLong + "\n");
            mTvOperatorResult1.setText(sb.toString());
        }
    });

interval

创建一个按固定时间间隔发射整数序列的 Observable。
interval 操作符返回一个 Observable,它按固定的时间间隔发射一个无限递增的整数序列。

  • interval 默认在 computation 调度器上执行,线程数较少,容易出现阻塞
  • interval 的 subscribeOn 会无效,需要调用 interval(long period, TimeUnit unit, Scheduler scheduler) 来指定线程
1
2
3
4
5
6
7
8
Observable.interval(1, TimeUnit.SECONDS)
    .subscribe(new Consumer<Long>() {
        @Override
        public void accept(Long aLong) throws Exception {
            LogUtil.i(TAG, "interval :" + aLong);
            sb.append(aLong + "\n");
        }
    });

interval 的坑

  1. interval 的默认线程是 computation,有的华为手机不会调用,最好是换个线程;computation 线程数量是有限的,用完了的话容易出现阻塞导致任务不执行
  2. interval 操作符的线程调度看起来是不受 subscribeOn 影响的,里面默认指定了 computation 线程池;
本文由作者按照 CC BY 4.0 进行授权