# 使用笔记

## **啥是响应式编程**

使用可观察流进行的异步编程（Asynchronous programing with observable streams）

* 异步编程
  * 耗时操作在主线程之外执行
  * 事件驱动，事件本身是异步的并且随时都有可能发生
* 可观察\
  基于观察者模式，可由多个观察者同时订阅，在状态发生变化时，通知所有观察者
* 流\
  事件序列
* 可观察流\
  可以订阅的事件序列
* 响应式编程\
  基于推送、观察者模式来异步处理数据流的方法
* 数据是第一公民\
  程序可以看做由数据驱动，而不是线程按顺序执行

### RxJava 特点

* 基于观察者模式
* 改造迭代器模式，使数据基于推送传递给观察者
* 函数式编程
* 简洁\
  借助 Lambda
* 链式调用
* 支持懒加载
* 实现并发简单\
  切换线程十分方便
* 异步错误处理\
  观察者可以进行错误处理

## **RxJava 核心组件**

### 3 O

Observable,Observer and Operator

#### Observale 数据流，可观察对象

* 提供了多个重载的 subscibe 方法用于注册 Observer
* 虽然基于推送（回调），但并不是异步的。默认情况下，Observable 是同步的，事件将会在 subscribe() 执行的线程产生
* Hot or Cold
  * Cold Observale 在每个观察者订阅时，会重新产生一遍数据
  * Hot Observable 则会按照顺序产生事件，观察者只能收到自订阅后产生的事件
* 创建 Observable\
  RxJava 提供了一系列接口用来创建 Observable
  * Observable.just()：将出入的参数组装成一个 Observable\
    支持 1 到 9 个数据
  * Observable.fromArray()/Observable.fromIterable()\
    通过数组或可迭代集合创建一个 Observable
  * Observable.range()/Observable.rangeLong()\
    通过一个区间创建产生整数或 long 类型的 Observable
  * Observable.empty()\
    不产生数据，只调用 onCompelte()
  * Observable.error()\
    只调用 onError()
  * Observable.never()\
    不调用 Observer 的任何方法，一般用于测试
  * **Observable.create()**\
    更加灵活和基础的创建方法，可以根据需要调用 Observer 的方法
  * 懒发射
    * 以下两个创建方法中，参数提供的操作只会在订阅后才开始执行
    * Observable.fromCallable()\
      参数提供的函数返回要发射的数据
    * Observable.defer()\
      参数提供的函数返回一个 Observable（当 Observable 本身的创建也很耗时的时候，用 defer() 比较合适）

#### Observer 观察者，接收 Observable 发出的数据。当 onComplete()和 onError 回调后，就不会再产生事件

* 事件回调
  * onSubscribe(Disposable d)\
    订阅成功后回调,Disposable 可以用来取消订阅
  * onNext(T value)\
    每当当 Observable 产生一个事件，都会回调
  * onComplete()\
    Observable 不再产生数据时回调
  * onError(Throwable e)\
    Observable 内部出错时回调

#### Operator

对数据进行一系列转换

* 支持链式调用
* 提高可读性
* 让开发者更关注做什么而不是怎么做
* onNext() 接收到的数据最好是最终数据，转换都应该放在 Operator 中

## **Operators**

> 不要在 Operator 中修改原数据

* 变换与过滤
  * map:将一个类型转换成另外一种类型
  * filter：将不符合条件的数据过滤掉
* FlatMap
  * 传入的函数需要返回一个 Observable
  * 返回的 Observable 会立刻被订阅，并且产生的数据将被合并发送到 Observer
  * 如果有延迟，flatMap 产生的序列 可能和 Observable 的产生顺序不一致
* ConcatMap
  * 与 FlatMap 类似，不过会等前一个 Observable 产生完数据，才会订阅下一个，因此不会出现数据交叉的情况
* Skip：忽略前面 n 个数据
* Take：只取前面 n 个数据
* Fisrt\&Last：只取第一个或最后一个数据\
  返回一个特殊的 Observable：Single（只产生一个数据）last 等所有数据产生完成后才会收到数据​
* 组合
  * Observable.merge():将多个 Observable 组合成一个，数据可能会交叉产生，当其中一个 Observable 调用 onComplete()后，合并后的 Observable 就不再产生数据
  * Observable.concat():多个 Observable 按顺序订阅，前一个产生完数据后才会订阅后一个，因此产生数据不会交叉，所有 Observable 都完成后才会结束
  * Observable.zip():将多个 Observable 的数据按产生顺序组合成一个新的 Observable 序列，新 Observable 序列的长度等于包含数据最少的 Observable 的长度\
    如果一个 Observable 产生数据过快 ，那么会缓存产生过快的 Observable 数据并等待较慢的那个
* 聚合
  * toList():将一个 Observable 的数据打包成一个 List，并返回 Observable\<List\<T>>\
    调用 onComplete()后才会产生 toList(),如果没有调用onComplete ，那么 toList 也不会被调用
  * reduce():将元素累积，形成一个最终结果\
    比如累加。不能用于将数据累积到一个可变的数据结构中
  * collect():用于将元素累积到一个集合中,需要传一个 Callable 用于创建一个集合，然后再把数据累积到集合中\
    比如将字符拼接到一个 StringBuilder
* 工具性 Operator
  * doOnEach():每产生一个事件就会被回调（包括 onError onComplete）
  * donOnNext() doOnError() doOnComplete()
* Cache:用于缓存Observable 产生的事件\
  可以配合 timestamp() 来为缓存事件加上时间戳，用于判断是否需要重新产生事件
* 重用操作链
  * **将一系列操作链封装到ObservableTransformer 中，然后通过 Observable.compose()方法传入**

## **多线程**

Scheduler 的使用

* RxJava 默认是同步的
* Schedulers
  * Schedulers.newThread():新建一个线程
  * Schedulers.single():单线程，任务按顺序执行
  * Schedulers.from(Executor):基于提供的 Executor
  * [Schedulers.io](http://schedulers.io/)():主要用于 IO 操作，线程数会随着任务适当增加
  * Schedulers.computation():用于 CPU 密集型任务，使用等于 CPU 核数的线程（因为创建多个线程没有意义，反而会消耗性能）
  * Schedulers.trampoline():在当前线程将任务组织成队列执行
* Observable.subscribeOn():控制 Observable 的创建过程在哪个 Scheduler 执行
* Observable.observeOn()：控制观察者的行为在哪个 Scheduler 执行\
  RxAndroid 提供了 Android 平台的线程模型，比如 AnroidSchduler.mainThread()
* 细节
  * 链式调用中，只有第一个 subscribeOn 会生效，后续的都不会起作用\
    如果不想调用者修改Observable 的创建执行线程，可以先指定线程
  * observeOn 可以被调用多次，每次都会影响后面的操作\
    应该确保 observeOn 尽可能靠近 subscribe 方法，以确保计算操作尽肯能少的出现在主线程
* 使用 FlatMap 进行并发：在创建 Observable 的时候通过 subscribeOn 指定执行线程\
  不能保证顺序

## **Reactive Modeling on Android**

* 非响应式方法\
  RxJava 提供的用来将 Observable 转换成非响应式的方法，应尽量避免使用
  * blockingFirst()：阻塞调用线程，直到 Observable 产生第一个元素
  * blockingLast()：阻塞调用线程，直到 Observable 产生最后一个元素
  * blockingNext()：返回一个 Iterable 对象，可以用来遍历 Observable 产生的所有数据，每遍历一个数据都会阻塞调用线程
  * blockingSingle()：阻塞调用线程，直到 Observable 产生一个元素以及一个后续的 onComplete 事件，否则会抛出异常
  * blockingSubscribe()：阻塞调用线程，直到 Observable 产生一个终止事件（onNext 被忽略）
* 懒加载\
  耗时任务应该在订阅后才开始执行
  * 使用 defer 或者 fromCallable 来包装创建 Observable 的操作即可
* Reactive Everything
  * 处理耗时操作\
    网络请求、文件读写、Bitmap 处理等
    * 将耗时操作包装到 Observable 中，并制定执行线程，这样调用端就不用关心该操作是否会阻塞主线程
    * Completable、Single 和 Maybe\
      几种特殊的 Observable
      * Single：只产生一个元素
        * 对应 SingleObserver 只有三个回调：onSubscribe(Disposable d) 、onSuccess(T value) 和 onError(Throwable t)
      * Completable：不产生元素，只通知操作是否成功
        * 对应 CompletableObserver 回调：onSubscribe(Disposable d) 、onComplete() 和 onError(Throwable t)
      * Maybe:可能产生一个元素，也可能没有元素，可以看做 Single 和 Completable 的组合
        * 对应 MaybeObserver 回调：
          * onSubscribe(Disposable d)
          * onSuccess(T value)
          * onComplete()：没有产生元素时回调
          * onError(Throwable t)
      * 与 Observable 相互转换
        * Completable、Single 和 Maybe 有 toObservable 方法
        * Observable 的 toList 方法可以转换成 Single
  * 替换回调\
    通过 Observable.create()方法，创建 Observable 并调用观察者的一系列方法来更新数据，替换常用的回调形式编码
    * 多点传播
      * Cold Observable to Hot Observable\
        create() 方法返回的是一个 Cold Observable（新的观察者订阅后，会重新创建数据源并收到已产生的所有事件），有时需要将Cold Observable 转为 Hot Observable（多个观察者共享数据源，新观察者只收到自订阅后产生的数据）
        * 调用 .publish() 可以将一个 Observable 转换成 ConnectableObservable，即可转换成 Hot Observable\
          只调用.publish() ，观察者的 onNext() 方法并不会调用，还需要调用 .connect()方法，ConnectableObservable 才开始产生数据。refCount()方法保证只要有观察者订阅，​ConnectableObservable 就回保持与数据源的连接
        * 调用 .share() 方法也可以将一个 Cold Observable 转换成 Hot Observable，相当于调用 .publish().refCount()
      * Subjects\
        既可以当作数据源，又可以当做观察者。通过它也可以实现共享数据源的多点传播。
        * PublishSubject：观察者只能收到订阅后产生的数据
        * AsyncSubject：观察者只能收到 Observable 完成后的最后一个数据
        * BehaviorSubject：观察者只能收到最近产生的一个数据以及其后的所有数据
        * ReplaySubject：观察者能收到 Observable 产生过的所有旧数据以及之后产生的新数据
  * View 事件
    * 使用 RxBinding 可以将 View 的事件组织成 Observable
  * Disposable 和 Activity/Fragment 的生命周期\
    用于取消订阅，如果没有在合适的时机取消订阅，可能会产生内存泄漏
    * 不需要时就可以取消订阅，比如在 Activity 的 onDestroy() onPause()方法中
    * 使用 RxLiftCycle 库，可以在收到 Activity 或 Fragment 指定生命周期回调时自动取消订阅
    * 使用 Google 的 LifeCycle 库也可以

## **Backpressure(背压)**

> 解决数据产生速度比消费速度快的问题。如果生产者速度过快，则减慢其速度。

* Flowable\
  支持背压的 Observable
  * 支持的背压策略
    * BackpressureStrategy.ERROR:当下游观察者跟不上数据产生 速度时，抛出一个 MissingBackpressureException
    * **BackpressureStrategy.BUFFER（默认策略）:在下游观察者处理前，缓存已经产生但是未处理的数据（默认缓存数量为 128）**
    * BackpressureStrategy.DROP:丢弃最近产生的数据
    * BackpressureStrategy.LATEST:只保留最新产生的数据
    * BackpressureStrategy.MISSING:无背压，等同于直接使用 Observable
  * 什么时候用 Observable
    * 数据量小，不太可能出现 OOM
    * 处理 GUI 事件，不会很频繁
  * 什么时候用 Flowable\
    需要控制数据获取量
    * 数据量很大，可以控制产生的数据量
    * 文件读写：控制读取量
    * 数据库
    * 网络流
* Subscriber\
  比 Observer 多一个 onSubscribe(Subcribtion s)方法，Subcribtion 有一个 request(long)方法可以用来向数据源请求数据
* 限流
  * throttleFirst(long,timeunit) 、throttleFirst(long,timeunit):获取一段时间内的第一个或者最后一个事件
  * sample(long,timeunit):获取一段时间内最后一个事件（采样）
* 缓存
  * buffer:将一些数据缓存到 list
    * .buffer(int count): 缓存一定数量的数据
    * .buffer(ObservableSource boundary)：以 boundary 产生的数据为边界缓存数据
    * .buffer(long timeSpan,TimeUnit unit):缓存一定时间内的数据
  * window:将一些数据缓存成 Observable\
    便于链式操作甚至并发（类似于 flatMap）

## **Error Handling**

* 产生一个异常
  * observer.onError()
  * Obserable.error()
* Observer 的 onError() 回调\
  这是一个终止事件，一旦产生错误，Observable 将不会产生新数据
* 严重的异常（如 OOM、VMError）不会传递给观察者，而是在异常产生线程直接抛出
* 异常处理操作
  * Observable.onErrorReturnItem(T item):发生异常时不调用 onError，而是返回该方法提供的 item
  * Observable.onErrorReturn(Function valueSupplier)：发生异常时，返回 valueSupplier 提供的 item\
    用于根据不同错误返回对应的 item
  * Observable.onErrorResumeNext(ObservableSource next)：发生异常时，由另一个 Observable 继续提供数据
* 重试操作\
  重试条件不满足后，继续调用Observer 的 onError
  * Observable.retry():发生异常时重新订阅，让 Observable 继续完成数据产生\
    可以设置重试次数以及重试条件
  * Observable.retryWhen()\
    可以设置每次重试的时机。例如指数延迟，下一次重试的等待时间是本次的2倍
* 处理未传递异常\
  异常发生时，没有提供您 onError 实现或者订阅关系已取消
  * 未处理异常会调用 RxJavaPlugin.onError()默认打出堆栈信息
  * 可以通过 RxJavaPlugins.setErrorHandler() 设置未传递异常处理函数，这样就不会调用RxJavaPlugin.onError()（默认会 crash）\
    这种方式可以避免 UI 线程的崩溃，便于记录日志等
