文章

过滤操作符

过滤操作符

filter 过滤 false 的数据

接受一个参数,让其过滤掉不符合我们条件的值

  • 返回 true 的数据往下走,返回 false 的数据被过滤掉

案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void filterClick() {
    Observable.just(90, 1, 20, 65, -5, 7, 19)
            .filter(new Predicate<Integer>() {
                @Override
                public boolean test(@NonNull Integer integer) throws Exception {
                    return integer >= 10;
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(@NonNull Integer integer) throws Exception {
                    sb.append("filter : " + integer + "\n");
                    mTvOperatorResult1.setText(sb.toString());
                    Log.i(TAG, "filter : " + integer + "\n");
                }
            });
}

结果:

1
2
3
4
filter : 90
filter : 20
filter : 65
filter : 19

debounce 防抖

debounce 操作符介绍

debounce:防抖;
only emit an item from an Observable if a particular time-span has passed without it emitting another item,

对源 Observable 间隔期产生的结果进行过滤,如果在这个规定的间隔期内没有别的结果产生,则将这个结果提交给订阅者,否则忽略该结果,原理有点像光学防抖.

应用场景举例

去除发送频率过快的项
在 Edittext 上添加监听,当里面输入的内容变化后进行搜索。换句话说就是当用户的输入操作停止几秒钟之后再去搜索。如果用户一直在输入内容,那么一直不会触发搜索,直到用户输入完毕停止一定时间后才会去搜索,这个值就是 debounce 的参数值。
案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private Observable<Integer> getObservable() {
    return Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            // send events with simulated time wait
            emitter.onNext(1); // skip
            Thread.sleep(400);
            emitter.onNext(2); // deliver
            Thread.sleep(505);
            emitter.onNext(3); // skip
            Thread.sleep(100);
            emitter.onNext(4); // deliver
            Thread.sleep(605);
            emitter.onNext(5); // deliver
            Thread.sleep(510);
            emitter.onComplete();
        }
    });
}

getObservable()
    // 设置时间为0.5秒
    .debounce(500, TimeUnit.MILLISECONDS)
    // Run on a background thread
    .subscribeOn(Schedulers.io())
    // Be notified on the main thread
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(getObserver());//这块观察者不重要
  • 输出结果:
1
2
3
07-19 18:04:30.808 27531-27531/me.hacket.assistant I/RxJava2: debounce :2
07-19 18:04:31.414 27531-27531/me.hacket.assistant I/RxJava2: debounce :4
07-19 18:04:32.019 27531-27531/me.hacket.assistant I/RxJava2: debounce :5
  • 分析:

第一个事件 1 发送出来以后过了 400 毫秒后发送出了第二个事件,此时不事件 1 不满足时间的条件被遗弃,然后重新计时; 2 发出后休眠了 505 毫秒,超过了 500 毫秒,所以 2 被发射了出来,被观察者收到; 3 发出来后又过了 100 毫秒 4 发出来,所以 3 被遗弃,从 4 重新计时,后又过了 605 毫秒下一个事件才发出,所以 4 被发射了出来; 同理,5 之后的 0.5 秒内也没有再发出别的事件,所以最终 5 也被发射了出来。

类似一个弹簧,如果一个事件相当于挤压它一下的话,它回到初始状态需要一段时间,那如果一直有事件不断的挤压它,那它一直回不到初始状态,就一个事件也弹不出来。一旦有一段时间里面没有人挤压它,他就把最后一个弹出来了。周而复始

distinct 过滤重复

案例:

1
2
3
4
5
6
7
8
9
Observable.just(0, 1, 1, 2, 23, 4, 5, 3, 0, 5)
        .distinct()
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                mTvOperatorResult1.append("distinct : " + integer + "\n");
                Log.i(TAG, "distinct : " + integer + "\n");
            }
        });

结果:

1
2
3
4
5
6
7
distinct : 0
distinct : 1
distinct : 2
distinct : 23
distinct : 4
distinct : 5
distinct : 3

skip 跳过 n 个数据

skip 很有意思,其实作用就和字面意思一样,接受一个 long 型参数 count ,代表跳过 count 个数目开始接收。
案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
private void skipClick() {
    Observable.just(1, 2, 3, 4, 5)
            .skip(2)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(@NonNull Integer integer) throws Exception {
                    sb.append("skip : " + integer + "," + System.currentTimeMillis() + "\n");
                    Log.i(TAG, "skip : " + integer + "," + System.currentTimeMillis() + "\n");
                    mTvOperatorResult1.setText(sb.toString());
                }
            });
}

结果:

1
2
3
4
skip : 3,1531266462898
skip : 4,1531266462898
skip : 5,1531266462898

takeXXX

take

exytg
take,接受一个 long 型参数 count ,代表至多接收 count 个数据。
案例

1
2
3
4
5
6
7
8
9
10
11
12
private void takeClick() {
    Flowable.fromArray(1, 2, 3, 4, 5)
            .take(2)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(@NonNull Integer integer) throws Exception {
                    sb.append("take : " + integer + "\n");
                    Log.e(TAG, "accept: take : " + integer + "\n");
                    mTvOperatorResult1.setText(sb.toString());
                }
            });
}

结果:

1
2
accept: take : 1
accept: take : 2

takeUntil

takeUntil

发射数据直到 stopPredicate 返回了 true
olkys

takeUntil

发射数据直到 other 开始发射数据
emptz
理解:
看一下官方的图片解释,B 发送 0 数据后,A 就停止发送数据了
可以 AObservable 监听另外一个 BObservable,如果 BObservable 开始发送数据,AObservable 就不再发送数据。

1
AObservable.takeUntil(BObservable)

案例:每隔 1 秒发射一个数据,定时 5 秒后停止发射数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
LogUtil.i(TAG, "accept:" + "," + getThreadName() + "--" + System.currentTimeMillis() + "\n");

Observable.interval(1, TimeUnit.SECONDS)
    .subscribeOn(Schedulers.io())
    .takeUntil(Observable.timer(5, TimeUnit.SECONDS))
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Consumer<Long>() {

        @Override

        public void accept(Long aLong) throws Exception {

            LogUtil.i(TAG, "accept:" + aLong + "," + getThreadName() + "--" + System.currentTimeMillis() + "\n");

            sb.append("accept:" + aLong + "," + getThreadName() + "--" + System.currentTimeMillis() + "\n");

            mTvOperatorResult1.setText(sb.toString());

        }

    });

效果:

1
2
3
4
5
6
7
8
9
10
07-16 15:19:53.083 23077-23077/me.hacket.assistant I/hacket.RxJava2: accept:,main--1531725593083

07-16 15:19:54.090 23077-23077/me.hacket.assistant I/hacket.RxJava2: accept:0,main--1531725594090

07-16 15:19:55.090 23077-23077/me.hacket.assistant I/hacket.RxJava2: accept:1,main--1531725595090

07-16 15:19:56.090 23077-23077/me.hacket.assistant I/hacket.RxJava2: accept:2,main--1531725596090

07-16 15:19:57.090 23077-23077/me.hacket.assistant I/hacket.RxJava2: accept:3,main--1531725597090

ignoreElements

7it1e
ignoreElements 操作符忽略所有源 Observable 产生的结果,只会执行 onCpmpleted() 或者 onError() 方法
配合 andThen,可以做后续的操作。
andThen,收到 onComplete 就会执行里面的 Observable

1
2
3
4
5
6
7
8
9
10
11
List<String> list = new ArrayList<>();
Observable.fromIterable(list)
        //在这里进行载入页面
        .doOnNext(item -> loadPage(item))
        //ignoreElements会提供给你一个Completable
        .ignoreElements()
        //andThen触发证明上游的Completable已经结束。onComplete触发,这是转而进行andThen里的操作
        .andThen(TestObservableUntis.getStringObservable())
        //进行其他操作
//                .subscribeOn(Schedulers.io())
        .subscribe(x -> System.out.println(x + "已经结束载入所有页面"))

ofType

ckaif

过滤只有指定类型 Class 可以通过,其实就是调用 filter 来过滤

1
2
3
4
public final <U> Observable<U> ofType(final Class<U> clazz) {
    ObjectHelper.requireNonNull(clazz, "clazz is null");
    return filter(Functions.isInstanceOf(clazz)).cast(clazz);
}

案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private static void testofType() {
    Father father = new Son();
    Father grandSon = new GrandSon();
    Son son = new Son();
    String s = "11";
    Integer i = 2;
    Observable.just(father, s, i, son, grandSon).ofType(GrandSon.class)
            .subscribe(new Consumer<GrandSon>() {
                @Override
                public void accept(GrandSon grandson) throws Exception {
                    System.out.println("grandson:" + grandson);
                }
            });
}

interface Father {
}

static class Son implements Father {

}

static class GrandSon extends Son {

}

结果:

1
grandson:com.example.rx.operators.TestofType$GrandSon@1175e2db

first

8gn3x
取第一个数据,或者是默认值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void button1Click() {
    Observable
//                .create(new ObservableOnSubscribe<Integer>() {
//                    @Override
//                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
//                        emitter.onError(new RuntimeException("测试first exception!"));
//                        emitter.onNext(110);
//                        emitter.onNext(119);
//                        emitter.onNext(120);
//                        emitter.onComplete();
//                    }
//                })
            .empty()
            .first(404)
            .subscribe(new Consumer<Object>() {
                @Override
                public void accept(@NonNull Object integer) throws Exception {
                    sb.append("first : " + integer + "\n");
                    mTvOperatorResult1.setText(sb.toString());
                    Log.i(TAG, "first : " + integer + "\n");
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    sb.append("first : " + throwable.getMessage() + "\n");
                    mTvOperatorResult1.setText(sb.toString());
                    Log.i(TAG, "first : " + throwable.getMessage() + "\n");
                }
            });
}

last

last 操作符仅取出可观察到的最后一个值,或者是满足某些条件的最后一项。
last(defaultItem),默认值,如果 ObservableSource 为空。
案例:

1
2
3
4
5
6
7
8
9
10
11
12
private void lastClick() {
    Observable.just(1, 2, 3)
            .last(4)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(@NonNull Integer integer) throws Exception {
                    sb.append("last : " + integer + "\n");
                    mTvOperatorResult1.setText(sb.toString());
                    Log.i(TAG, "last : " + integer + "\n");
                }
            });
}

结果:3

samplei7s40

sample 操作符是定期扫描源 Observable 产生的结果,在指定的间隔周期内进行采样

1
2
3
4
5
6
7
override fun button1Click() {
    Observable.interval(1, TimeUnit.SECONDS)
            .sample(2, TimeUnit.SECONDS)
            .subscribe {
                LogUtil.logw(TAG, "sample", "数据:$it")
            }
}

reduce 和 scan

reduce

zw767
reduce 操作符每次用一个方法处理一个值,可以有一个 seed 作为初始值。
案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void reduceClick() {
    Observable.just(1, 2, 3, 4, 5)
            .reduce(new BiFunction<Integer, Integer, Integer>() {
                @Override
                public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
                    return integer * integer2;
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(@NonNull Integer integer) throws Exception {
                    sb.append("reduce : " + integer + "\n");
                    mTvOperatorResult1.setText(sb.toString());
                    Log.i(TAG, sb.toString());
                }
            });
}

结果:

1
120

scan

tlwre
scan 操作符作用和上面的 reduce 一致,唯一区别是 reduce 是个只追求结果的坏人,而 scan 会始终如一地把每一个步骤都输出
案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void scanClick() {

    Observable.just(1, 2, 3, 4, 5)

            .scan(new BiFunction<Integer, Integer, Integer>() {

                @Override

                public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {

                    return integer * integer2;

                }

            })

            .subscribe(new Consumer<Integer>() {

                @Override

                public void accept(@NonNull Integer integer) throws Exception {

                    sb.append("reduce : " + integer + "\n");

                    mTvOperatorResult1.setText(sb.toString());

                    Log.i(TAG, sb.toString());

                }

            });

}

结果:

1
2
3
4
5
6
7
8
9
10
reduce : 1

reduce : 2

reduce : 6

reduce : 24

reduce : 120

throttleXXX

throttleFirst

本文由作者按照 CC BY 4.0 进行授权