转换操作符
map 操作符
map 操作符,对 Observable 发射的每一项数据应用一个函数,执行变换操作,然后返回一个变换后结果的 Observable。map 操作符默认不在任何特定的调度器上执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void mapClick() {
new Thread() {
@Override
public void run() {
super.run();
Observable.just("hello,map")
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
String s1 = s.toUpperCase();
sb.append(getThreadName()).append(" map1: ").append(s1).append("\n");
return s1;
}
})
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
String s1 = s + " world!";
sb.append(getThreadName()).append(" map2: ").append(s1).append("\n");
return s1;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
sb.append(getThreadName()).append(" subscribe:" + s);
LogUtil.i(TAG, "subscribe :" + s);
}
});
}
}.start();
}
flatMap
flatMap 操作符使用一个指定的函数对原始 Observable 发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的 Observable,然后 flatMap 合并这些 Observables 发射的数据,最后将合并后的结果当作它自己的数据序列发射。
flatMap 对这些 Observable 发射的数据做的是合并操作,因此他们可能是交错的。有一个操作符不会让变换后的 Observable 发射的数据交错,它严格按照顺序发射这些数据,就是 concatMap。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
private void flatMapClick() {
User user = new User();
user.userName = "hacket";
user.addresses = new ArrayList<>();
User.Address address1 = new User.Address();
address1.street = "麻布新村";
address1.city = "深圳市";
user.addresses.add(address1);
User.Address address2 = new User.Address();
address2.street = "泉沙村";
address2.city = "耒阳市";
user.addresses.add(address2);
// map操作符
// Observable.just(user)
// .map(new Function<User, List<User.Address>>() {
// @Override
// public List<User.Address> apply(User user) throws Exception {
// return user.addresses;
// }
// })
// .subscribe(new Consumer<List<User.Address>>() {
// @Override
// public void accept(List<User.Address> addresses) throws Exception {
// for (User.Address address : addresses) {
// sb.append(address.street + "\n");
// }
// }
// });
// flatMap操作符
Observable
.just(user)
.flatMap(new Function<User, ObservableSource<User.Address>>() {
@Override
public ObservableSource<User.Address> apply(User user) throws Exception {
sb.append("flatMap: " + user + "\n");
return Observable.fromIterable(user.addresses);
}
})
.subscribe(new Consumer<User.Address>() {
@Override
public void accept(User.Address address) throws Exception {
sb.append("subscribe: " + address.street + "\n");
}
});
}
buffer
buffer 操作符接受两个参数,buffer(count,skip),作用是将 Observable 中的数据按 skip(步长) 分成最大不超过 count 的 buffer ,然后生成一个 Observable。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
buffer(count,skip)
count 是每组一共有多少个发射, skip是每组之间发射的间隔
如: 1,2,3,4,5 buffer(3,2)
第一组 就是 1,2,3
第二组 就是 第一组基础上跳过2个发射的步长 也就是从3 开始 3,4,5
第三组 在第二组基础上 跳过2个发射步长 也就是从5开始 5
若buffer(4,1)
第一组 步长从零 1 2 3 4
第二组 步长+1 2 3 4 5
第三组 步长+1 3 4 5
第四组 步长+1 4 5
第五组 步长+1 5
案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void bufferClick() {
Observable.just(1, 2, 3, 4, 5)
.buffer(3, 2)
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(@NonNull List<Integer> integers) throws Exception {
mTvOperatorResult1.append("buffer size : " + integers.size() + "\n");
Log.i(TAG, "buffer size : " + integers.size() + " ," + System.currentTimeMillis() + "\n");
mTvOperatorResult1.append("buffer value : ");
Log.i(TAG, "buffer value : " + " ," + System.currentTimeMillis() + "\n");
for (Integer i : integers) {
mTvOperatorResult1.append(i + " ," + System.currentTimeMillis() + "\n");
Log.i(TAG, i + " ," + System.currentTimeMillis() + "\n");
}
mTvOperatorResult1.append("\n");
Log.i(TAG, "\n");
}
});
}
结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
buffer size : 3 ,1531265916718
buffer value : ,1531265916719
1 ,1531265916719
2 ,1531265916719
3 ,1531265916719
buffer size : 3 ,1531265916719
buffer value : ,1531265916719
3 ,1531265916719
4 ,1531265916719
5 ,1531265916719
buffer size : 1 ,1531265916719
buffer value : ,1531265916719
5 ,1531265916719
groupBy 操作符
groupBy 操作符将一个 Observable 拆分为一些 Observables 集合,他们中的每一个都发射原始 Observable 的一个子序列。
哪个数据项由哪一个 Observable 发射是由一个函数判定的,这个函数给每一项指定一个 key,key 相同的数据会被同一个 Observable 发射,最终返回的是一个 GroupedObservable,它是一个抽象类,key 用于将数据分组到指定的 Observable。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private void groupByClick() {
Observable.range(1, 9)
.groupBy(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
String msg;
if (integer % 2 == 0) {
msg = "偶数组";
} else {
msg = "奇数组";
}
sb.append(getThreadName()).append(" groupBy :" + integer + "," + msg + "\n");
return msg;
}
})
.subscribe(new Consumer<GroupedObservable<String, Integer>>() {
@Override
public void accept(GroupedObservable<String, Integer> stringIntegerGroupedObservable) throws Exception {
String key = stringIntegerGroupedObservable.getKey();
sb.append(getThreadName() + " ")
.append(" subscribe:" + key + "\n");
stringIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
sb.append(" ---stringIntegerGroupedObservable subscribe:" + integer + "\n");
}
});
}
});
}
window 操作符
按照实际划分窗口,将数据发送给不同的 Observable
案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
private void windowClick() {
Observable.interval(1, TimeUnit.SECONDS) // 间隔一秒发一次
.take(15) // 最多接收15个
.window(3, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Observable<Long>>() {
@Override
public void accept(@NonNull Observable<Long> longObservable) throws Exception {
sb.append("Sub Divide begin...\n");
Log.e(TAG, "Sub Divide begin...\n");
longObservable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
sb.append("Next:" + aLong + "\n");
Log.e(TAG, "Next:" + aLong + "\n");
mTvOperatorResult1.setText(sb.toString());
}
});
}
});
}
结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
07-19 19:39:06.915 6488-6488/me.hacket.assistant E/RxJava2: Sub Divide begin...
07-19 19:39:07.898 6488-6488/me.hacket.assistant E/RxJava2: Next:0
07-19 19:39:08.898 6488-6488/me.hacket.assistant E/RxJava2: Next:1
07-19 19:39:09.898 6488-6488/me.hacket.assistant E/RxJava2: Sub Divide begin...
07-19 19:39:09.899 6488-6488/me.hacket.assistant E/RxJava2: Next:2
07-19 19:39:10.898 6488-6488/me.hacket.assistant E/RxJava2: Next:3
07-19 19:39:11.898 6488-6488/me.hacket.assistant E/RxJava2: Next:4
07-19 19:39:12.898 6488-6488/me.hacket.assistant E/RxJava2: Sub Divide begin...
07-19 19:39:12.899 6488-6488/me.hacket.assistant E/RxJava2: Next:5
07-19 19:39:13.898 6488-6488/me.hacket.assistant E/RxJava2: Next:6
07-19 19:39:14.898 6488-6488/me.hacket.assistant E/RxJava2: Next:7
07-19 19:39:15.898 6488-6488/me.hacket.assistant E/RxJava2: Sub Divide begin...
07-19 19:39:15.899 6488-6488/me.hacket.assistant E/RxJava2: Next:8
07-19 19:39:16.898 6488-6488/me.hacket.assistant E/RxJava2: Next:9
07-19 19:39:17.898 6488-6488/me.hacket.assistant E/RxJava2: Next:10
07-19 19:39:18.898 6488-6488/me.hacket.assistant E/RxJava2: Sub Divide begin...
07-19 19:39:18.899 6488-6488/me.hacket.assistant E/RxJava2: Next:11
07-19 19:39:19.898 6488-6488/me.hacket.assistant E/RxJava2: Next:12
07-19 19:39:20.898 6488-6488/me.hacket.assistant E/RxJava2: Next:13
07-19 19:39:21.898 6488-6488/me.hacket.assistant E/RxJava2: Sub Divide begin...
07-19 19:39:21.899 6488-6488/me.hacket.assistant E/RxJava2: Next:14
本文由作者按照 CC BY 4.0 进行授权