一、是什么?
- “a library for composing asynchronous and event-based programs using observable sequences for the Java VM”(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。简单的异步处理,扩展的观察者模式,灵活的线程控制。
- 感谢掘金作者Season的Rxjava2.0系列文章。
- Github项目地址:
https://github.com/ReactiveX/RxJava https://github.com/ReactiveX/RxAndroid
二、关键词
- Observable (可观察者,即被观察者)
- Observer (观察者)
- subscribe (订阅)
- Consumer:只接收onNext()的观察者
- Map:变换
- FlatMap
- Zip:合并
- Flowable(背压,生产者的速度大于消费者的速度)
- onCompleted(): 事件队列完结
- onNext():普通事件
- onError(): 事件队列异常
三、基本使用
1. 被观察者(Observable)
决定事件什么时候触发
使用Observable创建事件队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
/**
* ObservableEmitter:它可以通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)
* 发出next事件、complete(完成)事件和error(错误)事件
*
* Observable可以发送无限个onNext, Observer也可以接收无限个onNext.
*
* 当Observable发送了一个onComplete后, Observable的onComplete之后的事件将会继续发送,
* 而Observer收到onComplete事件之后将不再继续接收事件.
*
* 当Observable发送了一个onError后, Observableon的Error之后的事件将继续发送,
* 而Observer收到onError事件之后将不再继续接收事件.
*/
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
// 完成
emitter.onComplete();
}
});
2. 观察者(Observer)
可以调用Disposable的dispose()方法使Observer不在接收发过来的事件
如果有多个Disposable,RxJava中内置了一个容器CompositeDisposable, 每当我们得到一个Disposable时就调用CompositeDisposable.add()将它添加到容器中, 在退出的时候, 调用CompositeDisposable.clear() 即可切断所有的事件。
事件触发后做什么
Observer
接口1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22Observer<Integer> observer = new Observer<Integer>() {
public void onSubscribe(Disposable d) {
Log.e(TAG, "subscribe");
}
public void onNext(Integer value) {
Log.e(TAG, "" + value);
}
public void onError(Throwable e) {
Log.e(TAG, "error", e);
}
public void onComplete() {
Log.e(TAG, "complete");
Log.e(TAG, "rx01: ====== end ======");
}
};Consumer接口,只接收onNext(),不接收其它的
1
2
3
4
5
6
7
8// Consumer(消费者)表示只关心onNext事件
Consumer<Integer> consumer = new Consumer<Integer>() {
public void accept(Integer integer) throws Exception {
Log.e(TAG, "Observer thread is :" + Thread.currentThread().getName());
Log.e(TAG, "onNext: " + integer);
}
};
3. 订阅者(Subscribe)
创建了
Observable
和Observer
之后,再用subscribe()
方法将它们联结起来1
observable.subscribe(observer);
当然也可以使用RxJava的链式编程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42Observable.create(new ObservableOnSubscribe<Integer>() {
public void subscribe( ObservableEmitter<Integer> e)throws Exception {
for (int i = 0; i < 3; i++) {
Log.e(TAG, "发送: " + i);
e.onNext(i);
}
e.onComplete();
Log.e(TAG, "发送: " + 99);
e.onNext(99);
}
}).subscribe(new Observer<Integer>() {
// 调用dispose()会导致Observer不在接收事件
private Disposable disposable;
public void onSubscribe( Disposable d){
Log.e(TAG, "绑定: ");
disposable = d;
}
public void onNext( Integer integer){
Log.e(TAG, "接收: " + integer);
if (integer == 1) {
disposable.dispose();
}
}
public void onError( Throwable e){
Log.e(TAG, "onError: ", e);
}
public void onComplete() {
Log.e(TAG, "onComplete: ");
Log.e(TAG, "rx02: ====== end ======");
}
});
}
四、线程调度(Scheduler)
Schedulers.immediate()
:在当前线程运行,默认模式Schedulers.newThread()
: 总是启用新线程,并在新线程执行操作。Schedulers.io()
: I/O 操作,数据库,网络等Schedulers.computation()
: 计算AndroidSchedulers.mainThread()
:Android主线程
1 | // 后台线程取数据,主线程显示,适用于大多数 |
五、变换
1.Map
对原始Observable发出的每一项数据进行相应的操作后在发出。
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
2. flatMap
flatMap()接收一个Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里.
flatMap是没有顺序的,如果要顺序发送请使用concatMap
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
六、Zip合并
Zip通过一个函数将多个Observable发送的事件结合到一起,然后发送这些组合到一起的事件。它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。
1 | Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() { |
七、Flowable(背压,生产者的速度大于消费者的速度)
如果Observable与Observer不在同一个线程,当被观察者的生产速度大于被观察者的消费速度时会抛出
MissingBackpressureException
异常。在RxJava2中新增了Flowable专门用于专门应对背压(Backpressure)问题。
1 | /** |
八、其它API
- Scan:累加器
- Filter:过滤器
- take()、takeLast():只发送前N个元素、只发送后N个元素
- Skip()、SkipLast():不发送前N个元素、不发送后N个元素
- distinct:仅处理一次
- ElementAt():只发送第N个元素
- Sample():定期发射Observable最近发射的数据项
- Merge():合并多个Observables的发射物,多输入,单输出
- startWith():在开头插入一条