0%

RxJava的简单使用

一、是什么?

  1. “a library for composing asynchronous and event-based programs using observable sequences for the Java VM”(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。简单的异步处理,扩展的观察者模式,灵活的线程控制。
  2. 感谢掘金作者Season的Rxjava2.0系列文章。
  3. Github项目地址: https://github.com/ReactiveX/RxJava https://github.com/ReactiveX/RxAndroid

二、关键词

  1. Observable (可观察者,即被观察者)
  2. Observer (观察者)
  3. subscribe (订阅)
  4. Consumer:只接收onNext()的观察者
  5. Map:变换
  6. FlatMap
  7. Zip:合并
  8. Flowable(背压,生产者的速度大于消费者的速度)
  9. onCompleted(): 事件队列完结
  10. onNext():普通事件
  11. onError(): 事件队列异常

三、基本使用

1. 被观察者(Observable)

决定事件什么时候触发

  1. 使用Observable创建事件队列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
    /**
    * ObservableEmitter:它可以通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)
    * 发出next事件、complete(完成)事件和error(错误)事件
    *
    * Observable可以发送无限个onNext, Observer也可以接收无限个onNext.
    *
    * 当Observable发送了一个onComplete后, Observable的onComplete之后的事件将会继续发送,
    * 而Observer收到onComplete事件之后将不再继续接收事件.
    *
    * 当Observable发送了一个onError后, Observableon的Error之后的事件将继续发送,
    * 而Observer收到onError事件之后将不再继续接收事件.
    */
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onNext(3);
    // 完成
    emitter.onComplete();
    }
    });

2. 观察者(Observer)

可以调用Disposable的dispose()方法使Observer不在接收发过来的事件
如果有多个Disposable,RxJava中内置了一个容器CompositeDisposable, 每当我们得到一个Disposable时就调用CompositeDisposable.add()将它添加到容器中, 在退出的时候, 调用CompositeDisposable.clear() 即可切断所有的事件。

事件触发后做什么

  1. Observer接口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    Observer<Integer> observer = new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
    Log.e(TAG, "subscribe");
    }

    @Override
    public void onNext(Integer value) {
    Log.e(TAG, "" + value);
    }

    @Override
    public void onError(Throwable e) {
    Log.e(TAG, "error", e);
    }

    @Override
    public void onComplete() {
    Log.e(TAG, "complete");
    Log.e(TAG, "rx01: ====== end ======");
    }
    };
  2. Consumer接口,只接收onNext(),不接收其它的

    1
    2
    3
    4
    5
    6
    7
    8
    // Consumer(消费者)表示只关心onNext事件
    Consumer<Integer> consumer = new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
    Log.e(TAG, "Observer thread is :" + Thread.currentThread().getName());
    Log.e(TAG, "onNext: " + integer);
    }
    };

3. 订阅者(Subscribe)

  1. 创建了 ObservableObserver 之后,再用 subscribe() 方法将它们联结起来

    1
    observable.subscribe(observer);
  2. 当然也可以使用RxJava的链式编程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
    for (int i = 0; i < 3; i++) {
    Log.e(TAG, "发送: " + i);
    e.onNext(i);
    }
    e.onComplete();
    Log.e(TAG, "发送: " + 99);
    e.onNext(99);
    }
    }).subscribe(new Observer<Integer>() {

    // 调用dispose()会导致Observer不在接收事件
    private Disposable disposable;

    @Override
    public void onSubscribe(@NonNull Disposable d) {
    Log.e(TAG, "绑定: ");
    disposable = d;
    }

    @Override
    public void onNext(@NonNull Integer integer) {
    Log.e(TAG, "接收: " + integer);
    if (integer == 1) {
    disposable.dispose();
    }
    }

    @Override
    public void onError(@NonNull Throwable e) {
    Log.e(TAG, "onError: ", e);
    }

    @Override
    public void onComplete() {
    Log.e(TAG, "onComplete: ");
    Log.e(TAG, "rx02: ====== end ======");
    }
    });
    }

四、线程调度(Scheduler)

  • Schedulers.immediate():在当前线程运行,默认模式
  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
  • Schedulers.io(): I/O 操作,数据库,网络等
  • Schedulers.computation(): 计算
  • AndroidSchedulers.mainThread():Android主线程
1
2
3
4
5
6
7
// 后台线程取数据,主线程显示,适用于大多数
observable
// 指定 subscribe(被观察者)发生在 IO 线程,事件产生线程
.subscribeOn(Schedulers.io())
// 指定 Subscriber(观察者)的回调发生在主线程,事件消费线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(mSubscriber);

五、变换

1.Map

对原始Observable发出的每一项数据进行相应的操作后在发出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "This is result " + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, s);
}
});

2. flatMap

flatMap()接收一个Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里.

flatMap是没有顺序的,如果要顺序发送请使用concatMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
// 10毫秒的延时
return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, s);
}
});

六、Zip合并

Zip通过一个函数将多个Observable发送的事件结合到一起,然后发送这些组合到一起的事件。它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe");
}

@Override
public void onNext(String value) {
Log.e(TAG, "onNext: " + value);
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError");
}

@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});

七、Flowable(背压,生产者的速度大于消费者的速度)

如果Observable与Observer不在同一个线程,当被观察者的生产速度大于被观察者的消费速度时会抛出MissingBackpressureException异常。在RxJava2中新增了Flowable专门用于专门应对背压(Backpressure)问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* Subscription.request()方法表示Subscriber要处理几个事件
* emitter.requested()方法表示减少几个事件
* 在Flowable中使用Subscription.cancel()关闭事件处理
*/
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 128; i++) {
Log.e(TAG, "emit " + i);
emitter.onNext(i);
}
}
// BackpressureStrategy.ERROR 默认128kb 超出会抛出MissingBackpressureException异常
// BackpressureStrategy.BUFFER 无大小限制
// BackpressureStrategy.DROP 存不下的事件直接丢弃
// BackpressureStrategy.LATEST 只保留最新的事件,与DROP相反
}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
// .onBackpressureDrop()//添加背压策略
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {

@Override
public void onSubscribe(Subscription s) {
Log.e(TAG, "onSubscribe");
// 全局的
mSubscription = s;
// 处理多少个发送过来的事件
mSubscription.request(Long.MAX_VALUE);
// 取消接收事件
// mSubscription.cancel();
}

@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer);
}

@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}

@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});

八、其它API

  1. Scan:累加器
  2. Filter:过滤器
  3. take()、takeLast():只发送前N个元素、只发送后N个元素
  4. Skip()、SkipLast():不发送前N个元素、不发送后N个元素
  5. distinct:仅处理一次
  6. ElementAt():只发送第N个元素
  7. Sample():定期发射Observable最近发射的数据项
  8. Merge():合并多个Observables的发射物,多输入,单输出
  9. startWith():在开头插入一条

九、附录

  1. 本例源码:https://github.com/sdwfqin/AndroidSamples