RxJava应用场景
RxJava2 应用场景
- 18 个 RxJava2 应用场景
https://github.com/kaushikgopal/RxJava-Android-Samples - RxJava 应用场景
https://blog.csdn.net/johnny901114/article/details/51568562 - RxJava1.x 使用场景小结 - 大头鬼
https://blog.csdn.net/lzyzsd/article/details/50120801 - RxJava2 实战知识梳理
https://www.jianshu.com/p/c935d0860186
RxBinding 实现 UI 响应式
1、 各种事件绑定
2、 结合 RxBinding 实现表单校验
结合 combineLatest,如注册的时候所有输入信息(邮箱、密码、电话号码等)合法才点亮注册按钮。
3、 过滤重复点击
结合 throttleFirst
4、 自定义 Listener 响应式
5、 响应式界面(CheckBox 更新和 sp 更新)
结合 RxBinding 和 rx-preference
6、buffer 结合 RxBinding 统计 n 秒内点击次数
buffer
网络请求相关
1、利用 concatXXX 实现多级缓存
内存、本地(文件、数据库)、网络
使用 concat 和 first 实现多级缓存.md
2、多个请求依赖,接口依赖 –flatmap
例如用户注册成功后需要自动登录,我们只需要先通过注册接口注册用户信息,注册成功后马上调用登录接口进行自动登录即可。
3、合并接口(结合多个接口更新 ui 界面)–zip
用 zip 来实现,多个 Observable 发射多个数据,切换线程保证发射一个,zip 一个,subscribe 一个。
4、网络轮询
schedulePeriodically
5、使用 merge 合并两个数据源,界面需要等到多个接口并发取完数据,再更新
拼接两个 Observable 的输出,不保证顺序,按照事件产生的顺序发送给订阅者
例如一组数据来自网络,一组数据来自文件,需要合并两组数据一起展示。
6、检测网络连接状态
PublicSubject
7、重试机制
retryWhen
其他
1、 搜索关键字提醒
结合 debounce。使用 debounce 做 textSearch
使用 debounce 做 textSearch.md
2、 替代 EventBus
RxBus
3、 定时和周期性
timer,interval
4、 数组和集合遍历
from
5、 解决嵌套回调(callback hell)问题
一个接口的请求依赖另一个 API 请求返回的数据,用 flatMap
6、 做缓存
concat、first。依次检查 memory、disk 和 network 中是否存在数据,任何一步一旦发现数据后面的操作都不执行。
7、Schedulers 做应用线程池
8、BehaviorSubject 预缓存
9、轮询操作
interval+take 或 repeatWhen
10、延迟工作
结合 zip
11、超时
timeout
12、App 登录退出登录响应
Subject? 用 RxBus 可能更好
combineLatest 结合 RxBinding 实现表单校验
- 操作符 combineLatest
- skip(1),因为 InitialValueObservable 订阅时,会发射一个默认的值
""
,用 skip 过滤掉默认的值,用skipInitialValue()
也更好 - combineLatest 必须每个 Observable 都至少发射一次数据项,以后就不需要了,所以 skipInitialValue 可以不用加,如果需要最开始对按钮进行置灰或者可用不可用操作。
Observable 版本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
InitialValueObservable<CharSequence> enterPhoneObservable = RxTextView.textChanges(tv_bind_phone);
InitialValueObservable<CharSequence> enterAlipayAccountObservable = RxTextView.textChanges(et_alipay_account);
Observable
.combineLatest(enterPhoneObservable, enterAlipayAccountObservable, (phone, aliAccount) -> {
if (StringUtils.isEmpty(phone) || !ValidRegexUtils.isMobile(phone.toString())) {
LogUtil.w(TAG, "提现校验,绑定支付宝校验,phone为空或格式不对:" + phone + ",aliAccount:" + aliAccount);
return false;
}
if (StringUtils.isEmpty(aliAccount)) {
LogUtil.w(TAG, "提现校验,绑定支付宝校验,phone:" + phone + ",aliAccount格式不对:" + aliAccount);
return false;
}
int maxLength = 11;
if (aliAccount.length() > maxLength) {
CharSequence mobile = aliAccount.subSequence(0, maxLength);
LogUtil.w(TAG, "提现校验,绑定支付宝校验,aliAccount超过11位:" + aliAccount + ",截断:" + mobile);
et_alipay_account.setText(mobile);
et_alipay_account.setSelection(et_alipay_account.getText().length());
}
if (!ValidRegexUtils.isMobile(aliAccount.toString())) {
LogUtil.w(TAG, "绑定手机校验,绑定的phone:" + phone + ",aliAccount:" + aliAccount);
return false;
}
if (!StringUtils.isEquals(phone.toString(), aliAccount.toString())) {
return false;
}
return true;
})
.doOnDispose(() -> LogUtil.i(TAG, "提现校验,绑定支付宝校验,dispose"))
.as(bindLifecycle())
.subscribe(check -> ViewUtils.setEnable(btn_bind_go, check));
Flowable 版本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
_emailChangeObservable =
RxTextView.textChanges(_email).skip(1).toFlowable(BackpressureStrategy.LATEST);
_passwordChangeObservable =
RxTextView.textChanges(_password).skip(1).toFlowable(BackpressureStrategy.LATEST);
_numberChangeObservable =
RxTextView.textChanges(_number).skip(1).toFlowable(BackpressureStrategy.LATEST);
private void _combineLatestEvents() {
_disposableObserver =
new DisposableSubscriber<Boolean>() {
@Override
public void onNext(Boolean formValid) {
if (formValid) {
_btnValidIndicator.setBackgroundColor(
ContextCompat.getColor(getContext(), R.color.blue));
} else {
_btnValidIndicator.setBackgroundColor(
ContextCompat.getColor(getContext(), R.color.gray));
}
}
@Override
public void onError(Throwable e) {
Timber.e(e, "there was an error");
}
@Override
public void onComplete() {
Timber.d("completed");
}
};
Flowable.combineLatest(
_emailChangeObservable,
_passwordChangeObservable,
_numberChangeObservable,
(newEmail, newPassword, newNumber) -> {
boolean emailValid = !isEmpty(newEmail) && EMAIL_ADDRESS.matcher(newEmail).matches();
if (!emailValid) {
_email.setError("Invalid Email!");
}
boolean passValid = !isEmpty(newPassword) && newPassword.length() > 8;
if (!passValid) {
_password.setError("Invalid Password!");
}
boolean numValid = !isEmpty(newNumber);
if (numValid) {
int num = Integer.parseInt(newNumber.toString());
numValid = num > 0 && num <= 100;
}
if (!numValid) {
_number.setError("Invalid Number!");
}
return emailValid && passValid && numValid;
})
.subscribe(_disposableObserver);
}
使用 debounce 做 textSearch
debounce:当 N 个结点发生的时间太靠近(即发生的时间差小于设定的值 T),debounce 就会自动过滤掉前 N-1 个结点。
比如在做百度地址联想的时候,可以使用 debounce 减少频繁的网络请求。避免每输入(删除)一个字就做一次联想
结合 RxBinding,在输入框变化时,没个 2S 做一次联网请求联想操作,可以避免每次输入都联网请求。
debounce()+RxBinding 做搜索时间过滤
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
RxTextView.textChangeEvents(et_enter)
.debounce(2000, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<TextViewTextChangeEvent>() {
@Override
public void onError(Throwable e) {
Log.d(TAG, "Error");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(TextViewTextChangeEvent onTextChangeEvent) {
Log.d(TAG, format("开始联网搜索Searching for %s", onTextChangeEvent.text().toString()));
}
});
buffer 一段时间内的平均值
buffer 结合 RxBinding 统计 2 秒内点击次数
Observable<List> buffer(long timespan, TimeUnit unit)
收集 unit 时间内的 item 并发射
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
RxView.clicks(mButton18)
.buffer(2, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<List<Object>>() {
@Override
public void onNext(List<Object> objects) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < objects.size(); i++) {
sb.append(LogUtil.buildLogMsg("buffer", "点击了_" + i + "次"));
}
Log.e(TAG, sb.toString());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
结果:
1
2
3
4
【buffer】点击了_0次,线程:main,日期:2018-11-07 11:16:07
【buffer】点击了_1次,线程:main,日期:2018-11-07 11:16:07
【buffer】点击了_2次,线程:main,日期:2018-11-07 11:16:07
【buffer】点击了_3次,线程:main,日期:2018-11-07 11:16:07
可参考:
https://github.com/kaushikgopal/RxJava-Android-Samples#2-accumulate-calls-using-buffer
buffer 计算一段时间内数据的平均值
应用场景
- 统计一段时间内的平均温度
- 糗百聊天室中,监听自己说话的音量,平均 1 秒说话音量大于一个值 A,那么降低背景音乐的音量;当 1 秒内说话音量小于 A 时,恢复背景音乐的音量
糗百聊天室单位时间内背景音乐降低实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class VoiceVolumeDetecter private constructor() {
companion object {
const val TAG = VoiceChatRoomActivity.TAG
const val BUFFER_TIMESPAN: Long = 1_000
const val BG_VOLOLE_BIAS: Float = 0.5F
const val YOUME_VOLUME_CALLBACK_LEVEL_LOW: Int = YoumeVoiceManager.YOUME_VOLUME_CALLBACK_LEVEL_LOW
@JvmStatic
fun newInstance(): VoiceVolumeDetecter {
return VoiceVolumeDetecter()
}
}
private val mPublishSubject: PublishSubject<Int> = PublishSubject.create()
private val mCompositeDisposable: CompositeDisposable = CompositeDisposable()
/**
* callback: true需要降低背景音乐,false不需要
*/
fun register(callback: Callback<Boolean>?): VoiceVolumeDetecter {
val disposableObserver = object : DisposableObserver<List<Int>>() {
override fun onComplete() {
}
override fun onNext(volumes: List<Int>) {
var avgVolume = 0
if (volumes.isNotEmpty()) {
for (d in volumes) {
avgVolume += d
}
avgVolume /= volumes.size
}
val savedBgMusicVolume = SPUtils.getInstance().getInt(BgMusicBottomDialog.KEY_BG_VOLUME)
if (avgVolume >= YOUME_VOLUME_CALLBACK_LEVEL_LOW) {
val bgMusicVolume = (savedBgMusicVolume * BG_VOLOLE_BIAS)
.roundToInt()
YoumeVoiceManager.setBackgroundMusicVolume(bgMusicVolume)
callback?.onSuccess(true)
LogUtils.w(
TAG, "2秒内说话音量$avgVolume 不小于$YOUME_VOLUME_CALLBACK_LEVEL_LOW" +
",当前背景音乐音量:${YoumeVoiceManager.getBackgroundMusicVolume()}," +
"更新youme背景音乐到当前50%:$bgMusicVolume"
)
} else {
callback?.onSuccess(false)
YoumeVoiceManager.setBackgroundMusicVolume(savedBgMusicVolume)
LogUtils.e(
TAG, "2秒内说话音量$avgVolume 小于$YOUME_VOLUME_CALLBACK_LEVEL_LOW" +
",恢复youme背景音乐到用户设置值:$savedBgMusicVolume"
)
}
}
override fun onError(e: Throwable) {
callback?.onFailed(Callback.commonErrorCode, "更新平均说话音量失败了")
}
}
mPublishSubject.buffer(BUFFER_TIMESPAN, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(disposableObserver)
mCompositeDisposable.add(disposableObserver)
return this
}
fun update(volume: Int) {
getSubject().onNext(volume)
}
private fun getSubject(): PublishSubject<Int> {
return mPublishSubject
}
fun unregister() {
if (!mCompositeDisposable.isDisposed) {
mCompositeDisposable.dispose()
}
}
}
统计一段时间温度变化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class CalcAverageActivity : AppCompatActivity() {
private val mPublishSubject: PublishSubject<Int>? = PublishSubject.create()
private var mCompositeDisposable: CompositeDisposable? = null
private var mSourceHandler: SourceHandler? = null
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(me.hacket.assistant.R.layout.activity_calc_average)
val disposableObserver = object : DisposableObserver<List<Int>>() {
override fun onNext(o: List<Int>) {
var result = 0.0
if (o.size > 0) {
for (d in o) {
result += d
}
result = result / o.size
}
LogUtil.d("BufferActivity", "更新平均温度:$result")
tv_average.setText("过去3秒收到了" + o.size + "个数据, 平均温度为:" + result)
}
override fun onError(throwable: Throwable) {
}
override fun onComplete() {
}
}
mPublishSubject?.buffer(3000, TimeUnit.MILLISECONDS)
?.observeOn(AndroidSchedulers.mainThread())
?.subscribe(disposableObserver)
mCompositeDisposable = CompositeDisposable()
mCompositeDisposable?.add(disposableObserver)
// 开始测量温度。
mSourceHandler = SourceHandler()
mSourceHandler?.sendEmptyMessage(0)
}
fun updateTemperature(temperature: Int) {
LogUtil.d("BufferActivity", "温度测量结果:$temperature")
mPublishSubject?.onNext(temperature)
}
private inner class SourceHandler : Handler() {
override fun handleMessage(msg: Message) {
super.handleMessage(msg)
val temperature = (Math.random() * 25 + 5).toInt()
updateTemperature(temperature)
//循环地发送。
sendEmptyMessageDelayed(0, 250 + (250 * Math.random()).toLong())
}
}
}
Ref
- RxJava2 实战知识梳理 (2) - 计算一段时间内数据的平均值
https://www.jianshu.com/p/5dd01b14c02a
检测网络连接状态
PublishProcessor
distinctUntilChanged()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class NetworkDetectorAct extends BaseActivity {
@BindView(R.id.tv_network)
TextView tv_network;
private BroadcastReceiver broadcastReceiver;
private PublishProcessor<Boolean> publishProcessor;
private Disposable disposable;
@Override
public int getLayoutResId() {
return R.layout.activity_network_detector;
}
@Override
public void onStart() {
super.onStart();
publishProcessor = PublishProcessor.create();
disposable = publishProcessor
.startWith(getConnectivityStatus(this))
.distinctUntilChanged()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
online -> {
if (online) {
String msg = LogUtil.buildLogMsg("网络监测", "You are online\n");
LogUtil.i(msg);
tv_network.append(msg);
} else {
String msg = LogUtil.buildLogMsg("网络监测", "You are offline\n");
LogUtil.e(msg);
tv_network.append(msg);
}
});
listenToNetworkConnectivity();
}
@Override
public void onStop() {
super.onStop();
disposable.dispose();
unregisterReceiver(broadcastReceiver);
}
private void listenToNetworkConnectivity() {
broadcastReceiver = new BroadcastReceiver() {
@Override
public void onReceive(Context context, Intent intent) {
publishProcessor.onNext(getConnectivityStatus(context));
}
};
final IntentFilter intentFilter = new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
registerReceiver(broadcastReceiver, intentFilter);
}
private boolean getConnectivityStatus(Context context) {
ConnectivityManager cm =
(ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);
NetworkInfo networkInfo = cm.getActiveNetworkInfo();
return networkInfo != null && networkInfo.isConnected();
}
}
zip 和 merge
zip
使用 zip 合并多个接口并行
合并接口(结合多个接口更新 ui 界面)
用 zip 来实现,多个 Observable 发射多个数据,切换线程保证发射一个,zip 一个,subscribe 一个。
zip 会将多个数据源合并为一个
推迟做一些工作
zip 将多个 Observable 相应位置上的数据 (用 item 更贴切) 按指定的函数合成一个结果,然后重新形成一个新的 Observable。
比如 Observable1 发射了第一个数据,那么必须等到 Observable2 发射了第一个数据,才能按指定的函数生成新的 Observable 的第一个数据.
利用这个特性,我们可以做一些推迟的工作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Observable.zip(
Observable.timer(5, TimeUnit.SECONDS)
.doOnNext(it -> System.out.println("timer:" + it)),
Observable.just(doSomething()),
(x, y) -> y)
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
sb.append("zip : accept : " + o + "\n");
Log.e(TAG, "zip : accept : " + o + "\n");
mTvOperatorResult1.setText(sb.toString());
}
});
private Object doSomething() {
sb.append("zip延迟发射数据");
Log.e(TAG, "zip延迟发射数据");
return "zip发射数据";
}
merge
见缓存的 merge
缓存
场景 1:先缓存数据源,再网络数据源 –concat
先内存缓存,然后磁盘缓存,最后网络缓存,最后合并起来
这种用 concat 操作符就可以
- concat 最大缺点
我们白白浪费了前面读取缓存的这段时间,能不能同时发起读取缓存和网络的请求,而不是等到读取缓存完毕之后,才去请求网络呢(一个个串行执行,后面的需要等待前面的完成)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Observable.concat(CacheDemoUtils.getMemory(), CacheDemoUtils.getDisk(), CacheDemoUtils.getNetwork())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(object : CacheObserver() {
override fun onStart() {
super.onStart()
LogUtil.logd("rxjava2", "onStart", "开始了")
}
override fun onNext(t: MutableList<CacheDemoUtils.User>) {
LogUtil.logi("rxjava2", "getDatas", "获取到数据:" + t.size + ",来源:" + t[0].from)
bindDatas(t)
}
override fun onError(e: Throwable) {
super.onError(e)
LogUtil.logw("rxjava2", "onError", e.message)
}
override fun onComplete() {
super.onComplete()
LogUtil.logw("rxjava2", "onComplete", "onComplete")
}
})
场景 2:同时取缓存数据源、网络数据源 –concatEager
缓存数据源和网络数据源同时取数据,如果网络源取数据要快于缓存数据源,那么先缓存起来等缓存数据源数据取到,然后再发射网络源数据
- concatEager 缺点
在某些异常情况下,如果读取缓存的时间要大于网络请求的时间,那么就会导致出现 “ 网络请求的结果 “ 等待 “ 读取缓存 “ 这一过程完成后才能传递给下游,白白浪费了一段时间
和 concat 相比,concatEager 可以同时请求,后面快的 Observable 可以缓存起来等待前面的 Observable,然后再一起合并
- 缺点
后面快的 Observable 需要等待前面的 Observable,浪费了等待时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Observable.concatArrayEager(CacheDemoUtils.getMemory().subscribeOn(Schedulers.io()), CacheDemoUtils.getDisk().subscribeOn(Schedulers.io()), CacheDemoUtils.getNetwork().subscribeOn(Schedulers.io()))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(object : CacheObserver() {
override fun onStart() {
super.onStart()
LogUtil.logd("rxjava2", "onStart", "开始了")
}
override fun onNext(t: MutableList<CacheDemoUtils.User>) {
LogUtil.logi("rxjava2", "getDatas", "获取到数据:" + t.size + ",来源:" + t[0].from)
bindDatas(t)
}
override fun onError(e: Throwable) {
super.onError(e)
LogUtil.logw("rxjava2", "onError", e.message)
}
override fun onComplete() {
super.onComplete()
LogUtil.logw("rxjava2", "onComplete", "onComplete")
}
})
结果:disk 和 network 比 memory 快,需要等待 memory 完成并发射数据后,再发射他们的数据
1
2
3
4
5
6
7
8
D/hacket.rxjava2: 【onStart】开始了,线程:main,日期:2018-11-09 16:02:10
I/hacket.rxjava2: 【getMemory】开始从内存获取数据,需要耗时:20000,线程:RxCachedThreadScheduler-6,日期:2018-11-09 16:02:10
I/hacket.rxjava2: 【getDisk】开始从磁盘获取数据,需要耗时:5000,线程:RxCachedThreadScheduler-7,日期:2018-11-09 16:02:10
I/hacket.rxjava2: 【getNetwork】开始从网络获取数据,需要耗时:10000,线程:RxCachedThreadScheduler-8,日期:2018-11-09 16:02:10
I/hacket.rxjava2: 【getDatas】获取到数据:20,来源:memory,线程:main,日期:2018-11-09 16:02:30
I/hacket.rxjava2: 【getDatas】获取到数据:15,来源:disk,线程:main,日期:2018-11-09 16:02:30
I/hacket.rxjava2: 【getDatas】获取到数据:30,来源:network,线程:main,日期:2018-11-09 16:02:30
W/hacket.rxjava2: 【onComplete】onComplete,线程:main,日期:2018-11-09 16:02:30
场景 3:依次从内存缓存、磁盘缓存、网络缓存中取数据,哪里取到数据了,就终止 – concat+first
concat+first
- concat 将所有的 Observable 发射的数据合并起来
- first 取第一个数据
- 首先,这个只适合一个 Item 的时候。如果我们有多个 Item 从这个 Observable 中流出。 first() 操作符只会取第一个。
concat+first,也就是说依次检查 memory、disk 和 network 是否存在数据(需要有 onNext,onComplete 算没数据),一旦其中一个有数据,后续不执行
依次检查 memory、disk 和 network 中是否存在数据,任何一步一旦发现数据后面的操作都不执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void onViewClicked() {
Observable<String> memory = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("从内存获取缓存数据");
}
});
Observable<String> disk = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
String cachePref = RxPreferencesUtils.newInstance(getApplicationContext())
.rxSharedPreferences().getString("cache").get();
if (!TextUtils.isEmpty(cachePref)) {
emitter.onNext(cachePref);
} else {
emitter.onComplete();
}
}
});
Observable<String> network = Observable.just("network");
//依次检查memory、disk、network
Observable.concat(memory, disk, network)
.first("默认初始化数据")
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(s -> {
String s1 = LogUtil.buildLogMsg("concat-subscribe", "--------------subscribe: " + s);
LogUtil.logi(TAG, "concat-subscribe", "--------------subscribe: " + s);
mTextView12.setText(s1);
});
}
需要注意的是如果 memorySource, diskSource, networkSource 返回的都 null,那么会报一个异常:java.util.NoSuchElementException: Sequence contains no elements
用 rxjava2.x 试,好像没这个问题。
concat+takeFirst
用 takeFirst 操作,即使都没有数据,也不会报异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Observable.concat(memorySource, diskSource, networkSource)
// first()-> if no data from observables will cause exception :
// java.util.NoSuchElementException: Sequence contains no elements
// takeFirst -> no exception
.takeFirst(new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
return s != null;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
printLog(tvLogs, "Getting data from ", s);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
throwable.printStackTrace();
printLog(tvLogs, "Error: ", throwable.getMessage());
}
});
场景 4:从内存缓存、磁盘缓存、网络获取数据,哪个先来就先发射数据 –merge
它和 concatEager 一样,会让多个 Observable 同时开始发射数据,但是它不需要 Observable 之间的互相等待,而是直接发送给下游。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Observable.merge(CacheDemoUtils.getMemory().subscribeOn(Schedulers.io()), CacheDemoUtils.getDisk().subscribeOn(Schedulers.io()), CacheDemoUtils.getNetwork().subscribeOn(Schedulers.io()))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(object : CacheObserver() {
override fun onStart() {
super.onStart()
LogUtil.logd("rxjava2", "onStart", "开始了")
}
override fun onNext(t: MutableList<CacheDemoUtils.User>) {
LogUtil.logi("rxjava2", "getDatas", "获取到数据:" + t.size + ",来源:" + t[0].from)
bindDatas(t)
}
override fun onError(e: Throwable) {
super.onError(e)
LogUtil.logw("rxjava2", "onError", e.message)
}
override fun onComplete() {
super.onComplete()
LogUtil.logw("rxjava2", "onComplete", "onComplete")
}
})
结果:哪个先快的 Observable,先发射数据
1
2
3
4
5
6
7
8
D/hacket.rxjava2: 【onStart】开始了,线程:main,日期:2018-11-09 16:04:43
I/hacket.rxjava2: 【getMemory】开始从内存获取数据,需要耗时:20000,线程:RxCachedThreadScheduler-10,日期:2018-11-09 16:04:43
I/hacket.rxjava2: 【getDisk】开始从磁盘获取数据,需要耗时:5000,线程:RxCachedThreadScheduler-11,日期:2018-11-09 16:04:43
I/hacket.rxjava2: 【getNetwork】开始从网络获取数据,需要耗时:10000,线程:RxCachedThreadScheduler-12,日期:2018-11-09 16:04:43
I/hacket.rxjava2: 【getDatas】获取到数据:15,来源:disk,线程:main,日期:2018-11-09 16:04:48
I/hacket.rxjava2: 【getDatas】获取到数据:30,来源:network,线程:main,日期:2018-11-09 16:04:53
I/hacket.rxjava2: 【getDatas】获取到数据:20,来源:memory,线程:main,日期:2018-11-09 16:05:03
W/hacket.rxjava2: 【onComplete】onComplete,线程:main,日期:2018-11-09 16:05:03
场景 5:同时发起读取缓存、访问网络的请求,如果缓存的数据先回来,那么就先展示缓存的数据,而如果网络的数据先回来,那么就不再展示缓存的数据。 publish+merge+takeUntil
同时发起读取缓存、访问网络的请求,如果缓存的数据先回来,那么就先展示缓存的数据,而如果网络的数据先回来,那么就不再展示缓存的数据
publish + merge + takeUntil
- 使用 publish 目的:
调用 merge 和 takeUntil 会发生两次订阅,这时候就需要使用 publish 操作符,它接收一个 Function 函数,该函数返回一个 Observable,该 Observable 是对原 Observable,也就是上面网络源的 Observable 转换之后的结果,该 Observable 可以被 takeUntil 和 merge 操作符所共享,从而实现只订阅一次的效果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
CacheDemoUtils.getNetwork()
.publish(network -> {
return Observable.merge(network, CacheDemoUtils.getDisk().subscribeOn(Schedulers.io()).takeUntil(network),
CacheDemoUtils.getMemory().subscribeOn(Schedulers.io()).takeUntil(network));
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new CacheObserver() {
@Override
public void onNext(List<CacheDemoUtils.User> users) {
LogUtil.logi("rxjava2", "getDatas", "获取到数据:" + users.size() + ",来源:" + users.get(0).from);
bindDatas(users);
}
@Override
public void onError(Throwable e) {
super.onError(e);
LogUtil.logw("rxjava2", "onError", e.getMessage());
}
@Override
public void onComplete() {
super.onComplete();
LogUtil.logw("rxjava2", "onComplete", "onComplete");
}
});
效果:
1
2
3
4
5
6
I/hacket.rxjava2: 【getMemory】开始从内存获取数据,需要耗时:20000,线程:RxCachedThreadScheduler-11,日期:2018-11-09 17:40:54
I/hacket.rxjava2: 【getNetwork】开始从网络获取数据,需要耗时:10000,线程:RxCachedThreadScheduler-9,日期:2018-11-09 17:40:54
I/hacket.rxjava2: 【getDisk】开始从磁盘获取数据,需要耗时:5000,线程:RxCachedThreadScheduler-10,日期:2018-11-09 17:40:54
I/hacket.rxjava2: 【getDatas】获取到数据:15,来源:disk,线程:main,日期:2018-11-09 17:40:59
I/hacket.rxjava2: 【getDatas】获取到数据:30,来源:network,线程:main,日期:2018-11-09 17:41:04
W/hacket.rxjava2: 【onComplete】onComplete,线程:main,日期:2018-11-09 17:41:04
问题: 如果网络请求先返回时发生了错误(例如没有网络等)导致发送了 onError 事件,从而使得缓存的 Observable 也无法发送事件,最后界面显示空白。
针对这个问题,我们需要对网络的 Observable 进行优化,让其不将 onError 事件传递给下游。其中一种解决方式是通过使用 onErrorResume 操作符,它可以接收一个 Func 函数,其形参为网络发送的错误,而在上游发生错误时会回调该函数。我们可以根据错误的类型来返回一个新的 Observable,让订阅者镜像到这个新的 Observable,并且忽略 onError 事件,从而避免 onError 事件导致整个订阅关系的结束。
这里为了避免订阅者在镜像到新的 Observable 时会收到额外的时间,我们返回一个 Observable.never(),它表示一个永远不发送事件的上游。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public static Observable<List<User>> getNetwork() {
return Observable
.create(new ObservableOnSubscribe<List<User>>() {
@Override
public void subscribe(ObservableEmitter<List<User>> emitter) throws Exception {
long t = 10000;
LogUtil.logi("rxjava2", "getNetwork", "开始从网络获取数据,需要耗时:" + t);
for (int i = 0; i < 10; i++) {
SystemClock.sleep(t / 10);
if (i == 3) {
String s = null;
s.toUpperCase();
}
}
// emitter.onComplete();
List<User> users = new ArrayList<>();
for (int i = 50; i < 80; i++) {
users.add(new User("eason_" + i, "male", 0 + i, "network"));
}
if (!emitter.isDisposed()) {
emitter.onNext(users);
}
emitter.onComplete();
}
})
.onErrorResumeNext(throwable -> {
return Observable.never();
});
}
使用 switchIfEmpty 操作符实现 Android 检查本地缓存逻辑判断
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//从本地数据获取文章列表
getArticlesObservable(pageIndex, pageSize, categoryId)
//本地不存在,请求api
.switchIfEmpty(articleApi.getArticlesByCategoryId(pageIndex + "", pageSize + "", categoryId + "")
.compose(this.<RespArticlePaginate>handlerResult())
.flatMap(new Func1<RespArticlePaginate, Observable<RespArticlePaginate>>() {
@Override
public Observable<RespArticlePaginate> call(RespArticlePaginate respArticlePaginate) {
if (respArticlePaginate != null && respArticlePaginate.getList() != null) {
try {
articleDao.insertOrReplaceInTx(respArticlePaginate.getList());
} catch (Exception e) {
e.printStackTrace();
}
}
return Observable.just(respArticlePaginate);
}
}))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(createSubscriber(ID_ARTICLE_LIST)))
RxJava 的 Scheduler 做线程池用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class SchedulerUtils {
/**
* io操作较大时选择这个
*/
@NonNull
public static Disposable runInIOThread(Runnable run) {
return Schedulers.io().scheduleDirect(run);
}
public static Disposable runInIOThreadDelay(Runnable run, long milliseconds) {
return Schedulers.io().scheduleDirect(run, milliseconds, TimeUnit.MILLISECONDS);
}
/**
* 计算量较大的时候,选择这个
*/
public static Disposable runInComputeThread(Runnable run) {
return Schedulers.io().scheduleDirect(run);
}
public static Disposable runInComputeThreadDelay(Runnable run, long milliseconds) {
return Schedulers.computation().scheduleDirect(run, milliseconds, TimeUnit.MILLISECONDS);
}
/**
* 仅在一个子线程中处理,选择这个
*/
public static Disposable runInNewThread(Runnable run) {
return Schedulers.newThread().scheduleDirect(run);
}
public static Disposable runInNewThreadDelay(Runnable run, long milliseconds) {
return Schedulers.newThread().scheduleDirect(run, milliseconds, TimeUnit.MILLISECONDS);
}
/**
* 主线程操作选择这个
*/
public static Disposable runInMain(Runnable run) {
return AndroidSchedulers.mainThread().scheduleDirect(run);
}
public static Disposable runInMainDelay(Runnable run, long milliseconds) {
return AndroidSchedulers.mainThread().scheduleDirect(run, milliseconds, TimeUnit.MILLISECONDS);
}
public static Disposable runInSingle(Runnable run) {
return Schedulers.single().scheduleDirect(run);
}
public static Disposable runInSingleDelay(Runnable run, long milliseconds) {
return Schedulers.single().scheduleDirect(run, milliseconds, TimeUnit.MILLISECONDS);
}
}
Subject 实现登录注销登录各个页面状态切换
- 其实就是个观察者而已
- 利用 Relay 实现可能更好,避免
UndeliverableException
终止状态 - 还可以利用 Lifecycle 自动 dispose
- PublishSubject 实现的简易版本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class UserCenter private constructor() {
private lateinit var mUserInfo: UserInfo
var subject: Subject<UserInfo> = PublishSubject.create()
fun register(): Subject<UserInfo> {
return subject
}
fun update(userInfo: UserInfo) {
subject.onNext(userInfo)
}
fun loginIn(userInfo: UserInfo) {
subject.onNext(userInfo)
}
fun loginOff() {
subject.onError(RuntimeException("退出登录了"))
}
companion object {
@JvmStatic
fun getInstance(): UserCenter {
return ObjectHolder.instance
}
}
private object ObjectHolder {
val instance: UserCenter = UserCenter()
}
}
// 调用
UserCenter.getInstance().register()
.subscribe(new Observer<UserInfo>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(UserInfo userInfo) {
ToastUtils.showShort("用户信息更新了:" + userInfo);
}
@Override
public void onError(Throwable e) {
ToastUtils.showShort("用户退出登录了,记得切换状态");
}
@Override
public void onComplete() {
}
});
BehaviorSubject 预缓存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public class RxPreLoader<T> {
// 能够缓存订阅之前的最新数据
private BehaviorSubject<T> mBehaviorSubject;
private Disposable preDisposable;
private Disposable disposable;
private RxPreLoader(@NonNull Observable<T> preloadObservable) {
mBehaviorSubject = BehaviorSubject.create();
preDisposable = preloadObservable
.subscribe(
data -> {
if (mBehaviorSubject != null) {
publish(data);
}
},
throwable -> {
// nothing to do
if (mBehaviorSubject != null) {
mBehaviorSubject.onError(throwable);
}
},
() -> {
if (mBehaviorSubject != null) {
mBehaviorSubject.onComplete();
}
});
}
public static <T> RxPreLoader<T> preLoad(@NonNull Observable<T> preloadObservable) {
return new RxPreLoader<>(preloadObservable);
}
/**
* 发送事件
*/
private void publish(T data) {
mBehaviorSubject.onNext(data);
}
public Disposable subscribe(Consumer<T> onNext) {
disposable = mBehaviorSubject.subscribe(onNext);
return disposable;
}
public Disposable subscribe(DisposableObserver<T> observer) {
disposable = mBehaviorSubject.subscribeWith(observer);
return disposable;
}
public Disposable subscribe(ResourceObserver<T> observer) {
disposable = mBehaviorSubject.subscribeWith(observer);
return disposable;
}
/**
* 反订阅
*/
public void dispose() {
if (preDisposable != null && !preDisposable.isDisposed()) {
preDisposable.dispose();
preDisposable = null;
}
if (disposable != null && !disposable.isDisposed()) {
disposable.dispose();
disposable = null;
}
}
/**
* 获取缓存数据的Subject
*/
public BehaviorSubject<T> getCacheDataSubject() {
return mBehaviorSubject;
}
/**
* 直接获取最近的一个数据
*/
@Nullable
public T getCacheData() {
return mBehaviorSubject.getValue();
}
}
重试
重试抛出异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
public class RetryWhen implements Function<Observable<? extends Throwable>, Observable<?>> {
private final int retryTimes;
private final int delayMillis;
/**
* RetryWhen
*
* @param retryTimes 重试次数
* @param delayMillis 延时,单位ms
* @return RetryWhen
*/
public RetryWhen create(int retryTimes, int delayMillis) {
return new RetryWhen(retryTimes, delayMillis);
}
private RetryWhen(int retryTimes, int delayMillis) {
this.retryTimes = retryTimes;
this.delayMillis = delayMillis;
}
@Override
public Observable<?> apply(Observable<? extends Throwable> observable) {
return observable
.zipWith(Observable.range(1, retryTimes),
new BiFunction<Throwable, Integer, Pair<Integer, Throwable>>() {
@Override
public Pair<Integer, Throwable> apply(Throwable throwable, Integer integer) throws Exception {
return Pair.create(integer, throwable);
}
})
.flatMap(new Function<Pair<Integer, Throwable>, Observable<?>>() {
@Override
public Observable<?> apply(Pair<Integer, Throwable> pair) {
if (pair.first < retryTimes) {
return Observable.timer(delayMillis, TimeUnit.MILLISECONDS);
} else {
return Observable.error(pair.second);
}
}
});
}
/**
* Container to ease passing around a tuple of two objects. This object provides a sensible
* implementation of equals(), returning true if equals() is true on each of the contained
* objects.
*/
private static final class Pair<F, S> {
public final F first;
public final S second;
/**
* Constructor for a Pair.
*
* @param first the first object in the Pair
* @param second the second object in the pair
*/
public Pair(F first, S second) {
this.first = first;
this.second = second;
}
/**
* Checks the two objects for equality by delegating to their respective {@link
* Object#equals(Object)} methods.
*
* @param o the {@link Pair} to which this one is to be checked for equality
* @return true if the underlying objects of the Pair are both considered equal
*/
@Override
public boolean equals(Object o) {
if (!(o instanceof Pair)) {
return false;
}
Pair<?, ?> p = (Pair<?, ?>) o;
return Objects.equals(p.first, first) && Objects.equals(p.second, second);
}
/**
* Compute a hash code using the hash codes of the underlying objects
*
* @return a hashcode of the Pair
*/
@Override
public int hashCode() {
return (first == null ? 0 : first.hashCode()) ^ (second == null ? 0 : second.hashCode());
}
@Override
public String toString() {
return "Pair{" + String.valueOf(first) + " " + String.valueOf(second) + "}";
}
/**
* Convenience method for creating an appropriately typed pair.
*
* @param a the first object in the Pair
* @param b the second object in the pair
* @return a Pair that is templatized with the types of a and b
*/
public static <A, B> Pair<A, B> create(A a, B b) {
return new Pair<A, B>(a, b);
}
}
}
案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private static void retry2() {
new Test().sourceMayFailThrow()
.retryWhen(RetryWhen.create(3, 500))
// .subscribeOn(Schedulers.io())
// .observeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) {
System.out.println("testRetry complete: " + integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
System.out.println("testRetry error: " + throwable.toString());
}
});
}
private Observable<Integer> sourceMayFailThrow() {
return Observable.just(new Random())
.map(new Function<Random, Integer>() {
@Override
public Integer apply(Random random) {
return random.nextInt(100);
}
})
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) {
System.out.println("integer:" + integer + ",executeTimes: " + executeTimes++);
if (integer > 50) {
throw new RuntimeException();
}
}
});
}
重试不抛出异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public abstract class RetryWrapper<T> {
private Observable<T> source;
private int retryCount = 0;
public RetryWrapper(Observable<T> source) {
this.source = source;
}
/**
* 重试
*
* @param retryTimes 最大重试次数
* @return Observable<T>
*/
public Observable<T> retry(final int retryTimes) {
retryCount = 0;
return source
.doOnNext(new Consumer<T>() {
@Override
public void accept(T t) throws Exception {
if (checkResult(t)) {
return;
}
retryCount++;
if (retryCount < retryTimes) {
throw new ResultFailException();
}
}
})
.retryWhen(new Function<Observable<Throwable>, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.zipWith(
Observable.range(1, retryTimes), new BiFunction<Throwable, Integer, Integer>() {
@Override
public Integer apply(Throwable throwable, Integer integer) throws Exception {
return integer;
}
});
}
});
}
/**
* 校验结果,校验不通过需要重试
*
* @param t t
* @return true校验通过;false校验不通过
*/
abstract public boolean checkResult(T t);
private static class ResultFailException extends RuntimeException {
}
}
public class BooleanRetryWrapper extends RetryWrapper<Boolean> {
public BooleanRetryWrapper(Observable<Boolean> source) {
super(source);
}
@Override
public boolean checkResult(Boolean aBoolean) {
return aBoolean;
}
}
案例 1
小于 50 的数值,需要重试 3 次
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class Test {
private int executeTimes;
public static void main(String[] args) throws InterruptedException {
System.out.println("");
Observable<Boolean> sourceMayFail = new Test().sourceMayFail();
new BooleanRetryWrapper(sourceMayFail)
.retry(3)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.retry()
.subscribe(
new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) {
System.out.println("testRetry complete: " + aBoolean);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
System.out.println("testRetry error: " + throwable.toString());
}
});
Thread.sleep(50000);
}
private Observable<Boolean> sourceMayFail() {
return Observable.just(new Random())
.map(new Function<Random, Integer>() {
@Override
public Integer apply(Random random) {
return random.nextInt(100);
}
})
.map(new Function<Integer, Boolean>() {
@Override
public Boolean apply(Integer integer) {
System.out.println("integer:" + integer + ",executeTimes: " + executeTimes++);
return integer < 20;
}
});
}
结果:
1
2
3
4
5
6
7
8
9
// 大于20的需要重试,最多3次
integer:94,executeTimes: 0
integer:48,executeTimes: 1
integer:58,executeTimes: 2
testRetry complete: false
// 小于20的不需要重试,校验通过
integer:12,executeTimes: 0
testRetry complete: true
案例 2
字符串长度大于 5 需要重试
1
2
3
4
5
6
7
8
9
10
11
public final class StringRetryWrapper extends RetryWrapper<String> {
private static final int len = 5;
public StringRetryWrapper(Observable<String> source) {
super(source);
}
@Override
public boolean checkResult(String s) {
// 字符串长度大于5需要重试
return s.length() <= 5;
}
}
失败重试机制
- retryWhen
只要 Function 的 Observable 不是发射 Error,那么就会进行 retry。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
fun startExecutingWithExponentialBackoffDelay() {
val disposableSubscriber = object : DisposableSubscriber<Any>() {
override fun onNext(aVoid: Any) {
LogUtil.logd(TAG, "startExecutingWithExponentialBackoffDelay", "on Next")
tv_threading_log.append("onNext\n")
}
override fun onComplete() {
LogUtil.logd(TAG, "startExecutingWithExponentialBackoffDelay", "on Completed")
tv_threading_log.append("onComplete\n")
}
override fun onError(e: Throwable) {
e.printStackTrace()
LogUtil.logw(TAG, "startExecutingWithExponentialBackoffDelay", "Error: " + e.message)
tv_threading_log.append("onError\n")
}
}
Flowable.error<Any>(RuntimeException("testing retryWhen error!"))
.retryWhen(RetryWithDelay(5, 1000))
.doOnSubscribe { subscription ->
LogUtil.logi(TAG, "Retry", "尝试5次机会,每次间隔1秒")
tv_threading_log.append("[Retry] Attempting the impossible 5 times in intervals of 1s\n")
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe(disposableSubscriber)
}
inner class RetryWithDelay(private val _maxRetries: Int, private val _retryDelayMillis: Int) : Function<Flowable<out Throwable>, Publisher<*>> {
private var _retryCount: Int = 0
override fun apply(inputObservable: Flowable<out Throwable>): Publisher<*> {
return inputObservable.flatMap { throwable ->
if (++_retryCount < _maxRetries) {
LogUtil.logi(TAG, "RetryWithDelay", String.format("Retrying in %d次 ,总次数%d,%d ms", _retryCount, _maxRetries, _retryCount * _retryDelayMillis))
SchedulerUtils.runInMain {
tv_threading_log.append(String.format("[RetryWithDelay] Retrying in %d次 ,总次数%d,%d ms\n", _retryCount, _maxRetries, _retryCount * _retryDelayMillis))
}
return@flatMap Flowable.timer((_retryCount * _retryDelayMillis).toLong(), TimeUnit.MILLISECONDS)
} else {
LogUtil.logw(TAG, "RetryWithDelay", "Argh! 我放弃了")
SchedulerUtils.runInMain {
tv_threading_log.append("[RetryWithDelay] Argh!我放弃了\n")
}
return@flatMap Flowable.error<Any>(throwable)
}
}
}
}
效果:
1
2
3
4
5
6
7
I/hacket.rxjava2: 【Retry】尝试5次机会,每次间隔1秒,线程:main,日期:2018-11-12 19:08:47
I/hacket.rxjava2: 【RetryWithDelay】Retrying in 1次 ,总次数5,1000 ms,线程:main,日期:2018-11-12 19:08:47
I/hacket.rxjava2: 【RetryWithDelay】Retrying in 2次 ,总次数5,2000 ms,线程:RxComputationThreadPool-2,日期:2018-11-12 19:08:48
I/hacket.rxjava2: 【RetryWithDelay】Retrying in 3次 ,总次数5,3000 ms,线程:RxComputationThreadPool-3,日期:2018-11-12 19:08:50
I/hacket.rxjava2: 【RetryWithDelay】Retrying in 4次 ,总次数5,4000 ms,线程:RxComputationThreadPool-4,日期:2018-11-12 19:08:53
W/hacket.rxjava2: 【RetryWithDelay】Argh! 我放弃了,线程:RxComputationThreadPool-5,日期:2018-11-12 19:08:57
W/hacket.rxjava2: 【startExecutingWithExponentialBackoffDelay】Error: testing retryWhen error!,线程:main,日期:2018-11-12 19:08:57
失败重试,token 过期续签
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
private int mRetryCount;
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(throwable -> {
if (throwable instanceof IOException || throwable instanceof SocketException) { // 网络异常重试3次
mRetryCount++;
if (mRetryCount > 3) {
System.out.println("错误超过3次");
return Observable.error(throwable);
} else {
System.out.println("错误" + mRetryCount + "次");
return Observable.timer(mRetryCount * 1000, TimeUnit.MILLISECONDS);
}
/*} else if (throwable instanceof ApiException) { // token过期,重试
if (((ApiException) throwable).getCode() == 401) {
return UploadManager.getInstance().getImageTokenObservable();
}
return Observable.error(throwable);*/
} else { // 未知异常直接返回发送 error 的 Observable
System.out.println("未知异常");
throwable.printStackTrace();
return Observable.error(throwable);
}
});
}
});
轮询
定时轮询取前 n 条
interval+take 实现定时轮询,取前 N 条数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Disposable d
Flowable.interval(INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS)
.map(this::_doNetworkCallAndGetStringResult)
.take(pollCount)
.doOnSubscribe(
subscription -> {
_log(String.format("Start simple polling - %s", _counter));
})
.subscribe(
taskName -> {
_log(
String.format(
Locale.US,
"Executing polled task [%s] now time : [xx:%02d]",
taskName,
_getSecondHand()));
});
轮询之延时轮询 repeatWhen
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
Flowable.just(1L)
.repeatWhen(new RepeatWithDelay(pollCount, pollingInterval))
.subscribe(
o ->
_log(
String.format(
Locale.US,
"Executing polled task now time : [xx:%02d]",
_getSecondHand())),
e -> Timber.d(e, "arrrr. Error")));
public class RepeatWithDelay implements Function<Flowable<Object>, Publisher<Long>> {
private final int _repeatLimit;
private final int _pollingInterval;
private int _repeatCount = 1;
RepeatWithDelay(int repeatLimit, int pollingInterval) {
_pollingInterval = pollingInterval;
_repeatLimit = repeatLimit;
}
// this is a notificationhandler, all we care about is
// the emission "type" not emission "content"
// only onNext triggers a re-subscription
@Override
public Publisher<Long> apply(Flowable<Object> inputFlowable) throws Exception {
// it is critical to use inputObservable in the chain for the result
// ignoring it and doing your own thing will break the sequence
return inputFlowable.flatMap(
new Function<Object, Publisher<Long>>() {
@Override
public Publisher<Long> apply(Object o) throws Exception {
if (_repeatCount >= _repeatLimit) {
// terminate the sequence cause we reached the limit
_log("Completing sequence");
return Flowable.empty();
}
// since we don't get an input
// we store state in this handler to tell us the point of time we're firing
_repeatCount++;
return Flowable.timer(_repeatCount * _pollingInterval, TimeUnit.MILLISECONDS);
}
});
}
}
轮询请求某个接口,得到值时返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
object PollTest2 {
@Suppress("CheckResult")
fun interval(milliseconds: Long) {
Observable.interval(milliseconds, TimeUnit.MILLISECONDS, Schedulers.io())
.doOnNext(object : Consumer<Long?> {
override fun accept(aLong: Long?) {
println("doOnNext " + aLong + "- " + Thread.currentThread().name)
}
})
.map<DDLInfo>(
Function { aLong ->
println("模拟获取sp数据 start.【$aLong】sleep 5s" + Thread.currentThread().name)
if (aLong == 5L) {
return@Function DDLInfo("deeplink", System.currentTimeMillis())
}
SleepTools.second(1)
println("----模拟获取sp数据 end.【$aLong】" + Thread.currentThread().name)
DDLInfo()
}
)
.takeUntil { ddlInfo ->
println("takeUntil s=" + ddlInfo + " " + Thread.currentThread().name)
ddlInfo.isValid()
}
.filter { ddlInfo ->
System.err.println("filter s=" + ddlInfo + " " + Thread.currentThread().name)
ddlInfo.isValid()
}
.timeout(3000L, TimeUnit.MILLISECONDS) // 2秒超时
.onErrorReturnItem(DDLInfo())
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(
object : Consumer<DDLInfo> {
override fun accept(t: DDLInfo?) {
println("==========succeed " + t + "," + Thread.currentThread().name)
println()
}
},
object : Consumer<Throwable?> {
override fun accept(throwable: Throwable?) {
throwable?.printStackTrace()
System.err.println("error: " + throwable?.message + Thread.currentThread().name)
println()
}
}
)
}
}
data class DDLInfo(
val deeplink: String = "",
val timestamp: Long = 0L
) {
fun isValid(): Boolean {
return deeplink.isNotBlank()
}
}
fun main() {
PollTest2.interval(200L)
Thread.sleep(12_000L)
}
- interval 用来每隔多少时间执行一次
- takeUntil 直到什么条件结束(返回 false 的往下走,返回 true 就终止)
- filter 过滤,返回的 true 的往下走,false 的丢弃
- timeout 超时
- onErrorReturnItem 失败时返回的默认数据,timeout 超时时也会走到这里
采用 repeat、结合 retry 实现轮询
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 按照顺序loop,意味着第一次结果请求完成后,再考虑下次请求
private void loopSequence() {
Disposable disposable = getDataFromServer()
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d(TAG, "loopSequence subscribe");
}
})
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "loopSequence doOnNext: " + integer);
}
})
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "loopSequence doOnError: " + throwable.getMessage());
}
})
.delay(5, TimeUnit.SECONDS, true) // 设置delayError为true,表示出现错误的时候也需要延迟5s进行通知,达到无论是请求正常还是请求失败,都是5s后重新订阅,即重新请求。
.subscribeOn(Schedulers.io())
.repeat() // repeat保证请求成功后能够重新订阅。
.retry() // retry保证请求失败后能重新订阅
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
view.showText(integer + "");
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
view.showText(throwable.getMessage());
}
});
compositeDisposable.add(disposable);
}
RxJava 中的 repeat 操作符可以在原始数据源发射数据完成后重新订阅数据源,而 retry 可以在原始数据源产生错误后重新订阅数据源。结合起来就可以在无论是成功还是失败的都能重新执行任务,则实现了轮询请求。再结合 delay 操作符,实现延迟执行任务。