使用笔记
啥是响应式编程
使用可观察流进行的异步编程(Asynchronous programing with observable streams)
异步编程
耗时操作在主线程之外执行
事件驱动,事件本身是异步的并且随时都有可能发生
可观察 基于观察者模式,可由多个观察者同时订阅,在状态发生变化时,通知所有观察者
流 事件序列
可观察流 可以订阅的事件序列
响应式编程 基于推送、观察者模式来异步处理数据流的方法
数据是第一公民 程序可以看做由数据驱动,而不是线程按顺序执行
RxJava 特点
基于观察者模式
改造迭代器模式,使数据基于推送传递给观察者
函数式编程
简洁 借助 Lambda
链式调用
支持懒加载
实现并发简单 切换线程十分方便
异步错误处理 观察者可以进行错误处理
RxJava 核心组件
3 O
Observable,Observer and Operator
Observale
数据流,可观察对象
提供了多个重载的 subscibe 方法用于注册 Observer
虽然基于推送(回调),但并不是异步的。默认情况下,Observable 是同步的,事件将会在 subscribe() 执行的线程产生
Hot or Cold
Cold Observale 在每个观察者订阅时,会重新产生一遍数据
Hot Observable 则会按照顺序产生事件,观察者只能收到自订阅后产生的事件
创建 Observable RxJava 提供了一系列接口用来创建 Observable
Observable.just():将出入的参数组装成一个 Observable 支持 1 到 9 个数据
Observable.fromArray()/Observable.fromIterable() 通过数组或可迭代集合创建一个 Observable
Observable.range()/Observable.rangeLong() 通过一个区间创建产生整数或 long 类型的 Observable
Observable.empty() 不产生数据,只调用 onCompelte()
Observable.error() 只调用 onError()
Observable.never() 不调用 Observer 的任何方法,一般用于测试
Observable.create() 更加灵活和基础的创建方法,可以根据需要调用 Observer 的方法
懒发射
以下两个创建方法中,参数提供的操作只会在订阅后才开始执行
Observable.fromCallable() 参数提供的函数返回要发射的数据
Observable.defer() 参数提供的函数返回一个 Observable(当 Observable 本身的创建也很耗时的时候,用 defer() 比较合适)
Observer
观察者,接收 Observable 发出的数据。当 onComplete()和 onError 回调后,就不会再产生事件
事件回调
onSubscribe(Disposable d) 订阅成功后回调,Disposable 可以用来取消订阅
onNext(T value) 每当当 Observable 产生一个事件,都会回调
onComplete() Observable 不再产生数据时回调
onError(Throwable e) Observable 内部出错时回调
Operator
对数据进行一系列转换
支持链式调用
提高可读性
让开发者更关注做什么而不是怎么做
onNext() 接收到的数据最好是最终数据,转换都应该放在 Operator 中
Operators
不要在 Operator 中修改原数据
变换与过滤
map:将一个类型转换成另外一种类型
filter:将不符合条件的数据过滤掉
FlatMap
传入的函数需要返回一个 Observable
返回的 Observable 会立刻被订阅,并且产生的数据将被合并发送到 Observer
如果有延迟,flatMap 产生的序列 可能和 Observable 的产生顺序不一致
ConcatMap
与 FlatMap 类似,不过会等前一个 Observable 产生完数据,才会订阅下一个,因此不会出现数据交叉的情况
Skip:忽略前面 n 个数据
Take:只取前面 n 个数据
Fisrt&Last:只取第一个或最后一个数据 返回一个特殊的 Observable:Single(只产生一个数据)last 等所有数据产生完成后才会收到数据
组合
Observable.merge():将多个 Observable 组合成一个,数据可能会交叉产生,当其中一个 Observable 调用 onComplete()后,合并后的 Observable 就不再产生数据
Observable.concat():多个 Observable 按顺序订阅,前一个产生完数据后才会订阅后一个,因此产生数据不会交叉,所有 Observable 都完成后才会结束
Observable.zip():将多个 Observable 的数据按产生顺序组合成一个新的 Observable 序列,新 Observable 序列的长度等于包含数据最少的 Observable 的长度 如果一个 Observable 产生数据过快 ,那么会缓存产生过快的 Observable 数据并等待较慢的那个
聚合
toList():将一个 Observable 的数据打包成一个 List,并返回 Observable<List<T>> 调用 onComplete()后才会产生 toList(),如果没有调用onComplete ,那么 toList 也不会被调用
reduce():将元素累积,形成一个最终结果 比如累加。不能用于将数据累积到一个可变的数据结构中
collect():用于将元素累积到一个集合中,需要传一个 Callable 用于创建一个集合,然后再把数据累积到集合中 比如将字符拼接到一个 StringBuilder
工具性 Operator
doOnEach():每产生一个事件就会被回调(包括 onError onComplete)
donOnNext() doOnError() doOnComplete()
Cache:用于缓存Observable 产生的事件 可以配合 timestamp() 来为缓存事件加上时间戳,用于判断是否需要重新产生事件
重用操作链
将一系列操作链封装到ObservableTransformer 中,然后通过 Observable.compose()方法传入
多线程
Scheduler 的使用
RxJava 默认是同步的
Schedulers
Schedulers.newThread():新建一个线程
Schedulers.single():单线程,任务按顺序执行
Schedulers.from(Executor):基于提供的 Executor
Schedulers.computation():用于 CPU 密集型任务,使用等于 CPU 核数的线程(因为创建多个线程没有意义,反而会消耗性能)
Schedulers.trampoline():在当前线程将任务组织成队列执行
Observable.subscribeOn():控制 Observable 的创建过程在哪个 Scheduler 执行
Observable.observeOn():控制观察者的行为在哪个 Scheduler 执行 RxAndroid 提供了 Android 平台的线程模型,比如 AnroidSchduler.mainThread()
细节
链式调用中,只有第一个 subscribeOn 会生效,后续的都不会起作用 如果不想调用者修改Observable 的创建执行线程,可以先指定线程
observeOn 可以被调用多次,每次都会影响后面的操作 应该确保 observeOn 尽可能靠近 subscribe 方法,以确保计算操作尽肯能少的出现在主线程
使用 FlatMap 进行并发:在创建 Observable 的时候通过 subscribeOn 指定执行线程 不能保证顺序
Reactive Modeling on Android
非响应式方法 RxJava 提供的用来将 Observable 转换成非响应式的方法,应尽量避免使用
blockingFirst():阻塞调用线程,直到 Observable 产生第一个元素
blockingLast():阻塞调用线程,直到 Observable 产生最后一个元素
blockingNext():返回一个 Iterable 对象,可以用来遍历 Observable 产生的所有数据,每遍历一个数据都会阻塞调用线程
blockingSingle():阻塞调用线程,直到 Observable 产生一个元素以及一个后续的 onComplete 事件,否则会抛出异常
blockingSubscribe():阻塞调用线程,直到 Observable 产生一个终止事件(onNext 被忽略)
懒加载 耗时任务应该在订阅后才开始执行
使用 defer 或者 fromCallable 来包装创建 Observable 的操作即可
Reactive Everything
处理耗时操作 网络请求、文件读写、Bitmap 处理等
将耗时操作包装到 Observable 中,并制定执行线程,这样调用端就不用关心该操作是否会阻塞主线程
Completable、Single 和 Maybe 几种特殊的 Observable
Single:只产生一个元素
对应 SingleObserver 只有三个回调:onSubscribe(Disposable d) 、onSuccess(T value) 和 onError(Throwable t)
Completable:不产生元素,只通知操作是否成功
对应 CompletableObserver 回调:onSubscribe(Disposable d) 、onComplete() 和 onError(Throwable t)
Maybe:可能产生一个元素,也可能没有元素,可以看做 Single 和 Completable 的组合
对应 MaybeObserver 回调:
onSubscribe(Disposable d)
onSuccess(T value)
onComplete():没有产生元素时回调
onError(Throwable t)
与 Observable 相互转换
Completable、Single 和 Maybe 有 toObservable 方法
Observable 的 toList 方法可以转换成 Single
替换回调 通过 Observable.create()方法,创建 Observable 并调用观察者的一系列方法来更新数据,替换常用的回调形式编码
多点传播
Cold Observable to Hot Observable create() 方法返回的是一个 Cold Observable(新的观察者订阅后,会重新创建数据源并收到已产生的所有事件),有时需要将Cold Observable 转为 Hot Observable(多个观察者共享数据源,新观察者只收到自订阅后产生的数据)
调用 .publish() 可以将一个 Observable 转换成 ConnectableObservable,即可转换成 Hot Observable 只调用.publish() ,观察者的 onNext() 方法并不会调用,还需要调用 .connect()方法,ConnectableObservable 才开始产生数据。refCount()方法保证只要有观察者订阅,ConnectableObservable 就回保持与数据源的连接
调用 .share() 方法也可以将一个 Cold Observable 转换成 Hot Observable,相当于调用 .publish().refCount()
Subjects 既可以当作数据源,又可以当做观察者。通过它也可以实现共享数据源的多点传播。
PublishSubject:观察者只能收到订阅后产生的数据
AsyncSubject:观察者只能收到 Observable 完成后的最后一个数据
BehaviorSubject:观察者只能收到最近产生的一个数据以及其后的所有数据
ReplaySubject:观察者能收到 Observable 产生过的所有旧数据以及之后产生的新数据
View 事件
使用 RxBinding 可以将 View 的事件组织成 Observable
Disposable 和 Activity/Fragment 的生命周期 用于取消订阅,如果没有在合适的时机取消订阅,可能会产生内存泄漏
不需要时就可以取消订阅,比如在 Activity 的 onDestroy() onPause()方法中
使用 RxLiftCycle 库,可以在收到 Activity 或 Fragment 指定生命周期回调时自动取消订阅
使用 Google 的 LifeCycle 库也可以
Backpressure(背压)
解决数据产生速度比消费速度快的问题。如果生产者速度过快,则减慢其速度。
Flowable 支持背压的 Observable
支持的背压策略
BackpressureStrategy.ERROR:当下游观察者跟不上数据产生 速度时,抛出一个 MissingBackpressureException
BackpressureStrategy.BUFFER(默认策略):在下游观察者处理前,缓存已经产生但是未处理的数据(默认缓存数量为 128)
BackpressureStrategy.DROP:丢弃最近产生的数据
BackpressureStrategy.LATEST:只保留最新产生的数据
BackpressureStrategy.MISSING:无背压,等同于直接使用 Observable
什么时候用 Observable
数据量小,不太可能出现 OOM
处理 GUI 事件,不会很频繁
什么时候用 Flowable 需要控制数据获取量
数据量很大,可以控制产生的数据量
文件读写:控制读取量
数据库
网络流
Subscriber 比 Observer 多一个 onSubscribe(Subcribtion s)方法,Subcribtion 有一个 request(long)方法可以用来向数据源请求数据
限流
throttleFirst(long,timeunit) 、throttleFirst(long,timeunit):获取一段时间内的第一个或者最后一个事件
sample(long,timeunit):获取一段时间内最后一个事件(采样)
缓存
buffer:将一些数据缓存到 list
.buffer(int count): 缓存一定数量的数据
.buffer(ObservableSource boundary):以 boundary 产生的数据为边界缓存数据
.buffer(long timeSpan,TimeUnit unit):缓存一定时间内的数据
window:将一些数据缓存成 Observable 便于链式操作甚至并发(类似于 flatMap)
Error Handling
产生一个异常
observer.onError()
Obserable.error()
Observer 的 onError() 回调 这是一个终止事件,一旦产生错误,Observable 将不会产生新数据
严重的异常(如 OOM、VMError)不会传递给观察者,而是在异常产生线程直接抛出
异常处理操作
Observable.onErrorReturnItem(T item):发生异常时不调用 onError,而是返回该方法提供的 item
Observable.onErrorReturn(Function valueSupplier):发生异常时,返回 valueSupplier 提供的 item 用于根据不同错误返回对应的 item
Observable.onErrorResumeNext(ObservableSource next):发生异常时,由另一个 Observable 继续提供数据
重试操作 重试条件不满足后,继续调用Observer 的 onError
Observable.retry():发生异常时重新订阅,让 Observable 继续完成数据产生 可以设置重试次数以及重试条件
Observable.retryWhen() 可以设置每次重试的时机。例如指数延迟,下一次重试的等待时间是本次的2倍
处理未传递异常 异常发生时,没有提供您 onError 实现或者订阅关系已取消
未处理异常会调用 RxJavaPlugin.onError()默认打出堆栈信息
可以通过 RxJavaPlugins.setErrorHandler() 设置未传递异常处理函数,这样就不会调用RxJavaPlugin.onError()(默认会 crash) 这种方式可以避免 UI 线程的崩溃,便于记录日志等
最后更新于
这有帮助吗?