过滤操作符
filter 过滤 false 的数据
接受一个参数,让其过滤掉不符合我们条件的值
- 返回 true 的数据往下走,返回 false 的数据被过滤掉
案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void filterClick() {
Observable.just(90, 1, 20, 65, -5, 7, 19)
.filter(new Predicate<Integer>() {
@Override
public boolean test(@NonNull Integer integer) throws Exception {
return integer >= 10;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
sb.append("filter : " + integer + "\n");
mTvOperatorResult1.setText(sb.toString());
Log.i(TAG, "filter : " + integer + "\n");
}
});
}
结果:
1
2
3
4
filter : 90
filter : 20
filter : 65
filter : 19
debounce 防抖
debounce 操作符介绍
debounce:防抖;
only emit an item from an Observable if a particular time-span has passed without it emitting another item,
对源 Observable 间隔期产生的结果进行过滤,如果在这个规定的间隔期内没有别的结果产生,则将这个结果提交给订阅者,否则忽略该结果,原理有点像光学防抖.
应用场景举例
去除发送频率过快的项
在 Edittext 上添加监听,当里面输入的内容变化后进行搜索。换句话说就是当用户的输入操作停止几秒钟之后再去搜索。如果用户一直在输入内容,那么一直不会触发搜索,直到用户输入完毕停止一定时间后才会去搜索,这个值就是 debounce 的参数值。
案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private Observable<Integer> getObservable() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// send events with simulated time wait
emitter.onNext(1); // skip
Thread.sleep(400);
emitter.onNext(2); // deliver
Thread.sleep(505);
emitter.onNext(3); // skip
Thread.sleep(100);
emitter.onNext(4); // deliver
Thread.sleep(605);
emitter.onNext(5); // deliver
Thread.sleep(510);
emitter.onComplete();
}
});
}
getObservable()
// 设置时间为0.5秒
.debounce(500, TimeUnit.MILLISECONDS)
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());//这块观察者不重要
- 输出结果:
1
2
3
07-19 18:04:30.808 27531-27531/me.hacket.assistant I/RxJava2: debounce :2
07-19 18:04:31.414 27531-27531/me.hacket.assistant I/RxJava2: debounce :4
07-19 18:04:32.019 27531-27531/me.hacket.assistant I/RxJava2: debounce :5
- 分析:
第一个事件 1 发送出来以后过了 400 毫秒后发送出了第二个事件,此时不事件 1 不满足时间的条件被遗弃,然后重新计时; 2 发出后休眠了 505 毫秒,超过了 500 毫秒,所以 2 被发射了出来,被观察者收到; 3 发出来后又过了 100 毫秒 4 发出来,所以 3 被遗弃,从 4 重新计时,后又过了 605 毫秒下一个事件才发出,所以 4 被发射了出来; 同理,5 之后的 0.5 秒内也没有再发出别的事件,所以最终 5 也被发射了出来。
类似一个弹簧,如果一个事件相当于挤压它一下的话,它回到初始状态需要一段时间,那如果一直有事件不断的挤压它,那它一直回不到初始状态,就一个事件也弹不出来。一旦有一段时间里面没有人挤压它,他就把最后一个弹出来了。周而复始
distinct 过滤重复
案例:
1
2
3
4
5
6
7
8
9
Observable.just(0, 1, 1, 2, 23, 4, 5, 3, 0, 5)
.distinct()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
mTvOperatorResult1.append("distinct : " + integer + "\n");
Log.i(TAG, "distinct : " + integer + "\n");
}
});
结果:
1
2
3
4
5
6
7
distinct : 0
distinct : 1
distinct : 2
distinct : 23
distinct : 4
distinct : 5
distinct : 3
skip 跳过 n 个数据
skip 很有意思,其实作用就和字面意思一样,接受一个 long 型参数 count ,代表跳过 count 个数目开始接收。
案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
private void skipClick() {
Observable.just(1, 2, 3, 4, 5)
.skip(2)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
sb.append("skip : " + integer + "," + System.currentTimeMillis() + "\n");
Log.i(TAG, "skip : " + integer + "," + System.currentTimeMillis() + "\n");
mTvOperatorResult1.setText(sb.toString());
}
});
}
结果:
1
2
3
4
skip : 3,1531266462898
skip : 4,1531266462898
skip : 5,1531266462898
takeXXX
take
take,接受一个 long 型参数 count ,代表至多接收 count 个数据。
案例
1
2
3
4
5
6
7
8
9
10
11
12
private void takeClick() {
Flowable.fromArray(1, 2, 3, 4, 5)
.take(2)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
sb.append("take : " + integer + "\n");
Log.e(TAG, "accept: take : " + integer + "\n");
mTvOperatorResult1.setText(sb.toString());
}
});
}
结果:
1
2
accept: take : 1
accept: take : 2
takeUntil
takeUntil
takeUntil
发射数据直到 other 开始发射数据
理解:
看一下官方的图片解释,B 发送 0 数据后,A 就停止发送数据了
可以 AObservable 监听另外一个 BObservable,如果 BObservable 开始发送数据,AObservable 就不再发送数据。
1
AObservable.takeUntil(BObservable)
案例:每隔 1 秒发射一个数据,定时 5 秒后停止发射数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
LogUtil.i(TAG, "accept:" + "," + getThreadName() + "--" + System.currentTimeMillis() + "\n");
Observable.interval(1, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.takeUntil(Observable.timer(5, TimeUnit.SECONDS))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
LogUtil.i(TAG, "accept:" + aLong + "," + getThreadName() + "--" + System.currentTimeMillis() + "\n");
sb.append("accept:" + aLong + "," + getThreadName() + "--" + System.currentTimeMillis() + "\n");
mTvOperatorResult1.setText(sb.toString());
}
});
效果:
1
2
3
4
5
6
7
8
9
10
07-16 15:19:53.083 23077-23077/me.hacket.assistant I/hacket.RxJava2: accept:,main--1531725593083
07-16 15:19:54.090 23077-23077/me.hacket.assistant I/hacket.RxJava2: accept:0,main--1531725594090
07-16 15:19:55.090 23077-23077/me.hacket.assistant I/hacket.RxJava2: accept:1,main--1531725595090
07-16 15:19:56.090 23077-23077/me.hacket.assistant I/hacket.RxJava2: accept:2,main--1531725596090
07-16 15:19:57.090 23077-23077/me.hacket.assistant I/hacket.RxJava2: accept:3,main--1531725597090
ignoreElements
ignoreElements 操作符忽略所有源 Observable 产生的结果,只会执行 onCpmpleted() 或者 onError() 方法
配合 andThen
,可以做后续的操作。
andThen,收到 onComplete 就会执行里面的 Observable
1
2
3
4
5
6
7
8
9
10
11
List<String> list = new ArrayList<>();
Observable.fromIterable(list)
//在这里进行载入页面
.doOnNext(item -> loadPage(item))
//ignoreElements会提供给你一个Completable
.ignoreElements()
//andThen触发证明上游的Completable已经结束。onComplete触发,这是转而进行andThen里的操作
.andThen(TestObservableUntis.getStringObservable())
//进行其他操作
// .subscribeOn(Schedulers.io())
.subscribe(x -> System.out.println(x + "已经结束载入所有页面"))
ofType
过滤只有指定类型 Class 可以通过,其实就是调用 filter 来过滤
1
2
3
4
public final <U> Observable<U> ofType(final Class<U> clazz) {
ObjectHelper.requireNonNull(clazz, "clazz is null");
return filter(Functions.isInstanceOf(clazz)).cast(clazz);
}
案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private static void testofType() {
Father father = new Son();
Father grandSon = new GrandSon();
Son son = new Son();
String s = "11";
Integer i = 2;
Observable.just(father, s, i, son, grandSon).ofType(GrandSon.class)
.subscribe(new Consumer<GrandSon>() {
@Override
public void accept(GrandSon grandson) throws Exception {
System.out.println("grandson:" + grandson);
}
});
}
interface Father {
}
static class Son implements Father {
}
static class GrandSon extends Son {
}
结果:
1
grandson:com.example.rx.operators.TestofType$GrandSon@1175e2db
first
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void button1Click() {
Observable
// .create(new ObservableOnSubscribe<Integer>() {
// @Override
// public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// emitter.onError(new RuntimeException("测试first exception!"));
// emitter.onNext(110);
// emitter.onNext(119);
// emitter.onNext(120);
// emitter.onComplete();
// }
// })
.empty()
.first(404)
.subscribe(new Consumer<Object>() {
@Override
public void accept(@NonNull Object integer) throws Exception {
sb.append("first : " + integer + "\n");
mTvOperatorResult1.setText(sb.toString());
Log.i(TAG, "first : " + integer + "\n");
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
sb.append("first : " + throwable.getMessage() + "\n");
mTvOperatorResult1.setText(sb.toString());
Log.i(TAG, "first : " + throwable.getMessage() + "\n");
}
});
}
last
last 操作符仅取出可观察到的最后一个值,或者是满足某些条件的最后一项。
last(defaultItem),默认值,如果 ObservableSource 为空。
案例:
1
2
3
4
5
6
7
8
9
10
11
12
private void lastClick() {
Observable.just(1, 2, 3)
.last(4)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
sb.append("last : " + integer + "\n");
mTvOperatorResult1.setText(sb.toString());
Log.i(TAG, "last : " + integer + "\n");
}
});
}
结果:3
sample
sample 操作符是定期扫描源 Observable 产生的结果,在指定的间隔周期内进行采样
1
2
3
4
5
6
7
override fun button1Click() {
Observable.interval(1, TimeUnit.SECONDS)
.sample(2, TimeUnit.SECONDS)
.subscribe {
LogUtil.logw(TAG, "sample", "数据:$it")
}
}
reduce 和 scan
reduce
reduce 操作符每次用一个方法处理一个值,可以有一个 seed 作为初始值。
案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void reduceClick() {
Observable.just(1, 2, 3, 4, 5)
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
return integer * integer2;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
sb.append("reduce : " + integer + "\n");
mTvOperatorResult1.setText(sb.toString());
Log.i(TAG, sb.toString());
}
});
}
结果:
1
120
scan
scan 操作符作用和上面的 reduce 一致,唯一区别是 reduce 是个只追求结果的坏人,而 scan 会始终如一地把每一个步骤都输出
案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void scanClick() {
Observable.just(1, 2, 3, 4, 5)
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
return integer * integer2;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
sb.append("reduce : " + integer + "\n");
mTvOperatorResult1.setText(sb.toString());
Log.i(TAG, sb.toString());
}
});
}
结果:
1
2
3
4
5
6
7
8
9
10
reduce : 1
reduce : 2
reduce : 6
reduce : 24
reduce : 120