💪
AndroidCollect
  • 写在前面
  • 计算机基础
    • 计算机组成原理
    • 算法
      • 查找
        • 二分查找
      • 排序
        • 简单排序
        • 高级排序
        • 特殊排序
      • 海量数据
      • 思想
        • 贪心
        • 分治
        • 动态规划
        • 回溯
      • 哈希算法
    • 数据结构
      • 队列
        • 知识点
        • 相关题目
          • 用两个栈实现队列
          • 实现循环队列
          • 用链表实现队列
          • 用数组实现队列
      • 栈
        • 相关算法题目
          • 用链表实现栈
          • 用数组实现栈
      • 链表
        • 知识点梳理
        • 相关算法题目
          • 删除倒数第n个结点
          • 合并两个有序链表
          • 检测单链表是否有环
          • 获取中间结点
          • 反转链表
      • 跳表
      • 哈希表
      • 树
        • 二叉树
        • 二叉查找树
        • AVL 树
        • Trie 树
        • 红黑树
      • 堆
        • 存储
        • 堆的应用
      • 图
    • 网络
      • 应用层协议
        • DNS
        • HTTP
        • HTTPS
      • 传输层协议
        • TCP
        • UDP
      • 输入网址后发生了什么
    • 操作系统
      • 内存
    • 数据库
  • 软件工程
    • 编程思想
    • 设计模式
      • 状态模式
      • 装饰器模式
      • 代理模式
      • 责任链模式
      • 建造者模式
      • 单例模式
      • 观察者模式
  • Java
    • 基础
    • 异常
    • 并发编程
      • ThreadLocal
      • 线程池
      • 理解 volatile
      • AbstractQueuedSynchronizer
    • 集合
      • LinkedHashMap 源码
      • HashMap 源码
    • 注解
    • 反射
      • JDK 动态代理
    • JVM
      • 自动内存管理机制
      • Class 文件格式
      • 类加载机制
      • Java 内存模型(JMM)
      • 字节码指令
      • HotSpot 虚拟机实现细节
    • 源码与原理
    • 各版本主要特性
  • Android
    • 基础组件
      • Context
      • Activity
        • 生命周期
        • 启动模式与任务栈
        • 启动流程
      • Service
      • ContentProvider
      • BroadcastReceiver
      • Fragment
      • View
        • 常用控件问题总结
          • RecyclerView
          • ViewPager2
        • CoordinatorLayout
        • SurfaceView
        • 事件分发
        • 绘制流程
        • 自定义 View
        • Window
    • 数据存储
      • 存储结构
      • Sqlite
      • 序列化
      • SharedPreferences
    • 资源
      • 图片加载
    • 动画
      • 属性动画
    • 线程和进程
      • Binder 机制
      • 跨进程通信
        • AIDL
    • 内部原理
      • 消息循环机制
      • Binder
      • Window
      • SparseArray
      • ArrayMap
      • RecyclerView
      • App 启动流程
    • 性能优化
      • 内存
        • 内存使用优化
        • 内存泄漏
      • 启动优化
      • 缩减包大小
      • 布局优化
      • ANR
    • 打包构建
      • dex 文件
      • APK 打包流程
      • APK 签名流程
    • 架构
      • 运行时
      • Android 系统架构
      • 应用项目架构
    • 开源框架源码或原理
      • RxJava
        • 使用笔记
        • 源码解析
      • Retrofit
      • ButterKnife
      • BlockCanary
      • LeakCanary
      • OkHttp
      • 图片加载
        • Glide
        • Picasso
    • 碎片化处理
      • 屏幕适配
    • 黑科技
      • 热修复
    • Jetpack
      • Lifecycle
      • Room
      • WorkManager
    • 新动态
      • AndroidX
      • 各系统版本特性
  • 开发工具
    • 正则表达式
    • ADB
    • Git
  • Kotlin
  • Flutter
  • 关于作者
  • 致谢
由 GitBook 提供支持
在本页
  • 啥是响应式编程
  • RxJava 特点
  • RxJava 核心组件
  • 3 O
  • Operators
  • 多线程
  • Reactive Modeling on Android
  • Backpressure(背压)
  • Error Handling

这有帮助吗?

  1. Android
  2. 开源框架源码或原理
  3. RxJava

使用笔记

啥是响应式编程

使用可观察流进行的异步编程(Asynchronous programing with observable streams)

  • 异步编程

    • 耗时操作在主线程之外执行

    • 事件驱动,事件本身是异步的并且随时都有可能发生

  • 可观察 基于观察者模式,可由多个观察者同时订阅,在状态发生变化时,通知所有观察者

  • 流 事件序列

  • 可观察流 可以订阅的事件序列

  • 响应式编程 基于推送、观察者模式来异步处理数据流的方法

  • 数据是第一公民 程序可以看做由数据驱动,而不是线程按顺序执行

RxJava 特点

  • 基于观察者模式

  • 改造迭代器模式,使数据基于推送传递给观察者

  • 函数式编程

  • 简洁 借助 Lambda

  • 链式调用

  • 支持懒加载

  • 实现并发简单 切换线程十分方便

  • 异步错误处理 观察者可以进行错误处理

RxJava 核心组件

3 O

Observable,Observer and Operator

Observale 数据流,可观察对象

  • 提供了多个重载的 subscibe 方法用于注册 Observer

  • 虽然基于推送(回调),但并不是异步的。默认情况下,Observable 是同步的,事件将会在 subscribe() 执行的线程产生

  • Hot or Cold

    • Cold Observale 在每个观察者订阅时,会重新产生一遍数据

    • Hot Observable 则会按照顺序产生事件,观察者只能收到自订阅后产生的事件

  • 创建 Observable RxJava 提供了一系列接口用来创建 Observable

    • Observable.just():将出入的参数组装成一个 Observable 支持 1 到 9 个数据

    • Observable.fromArray()/Observable.fromIterable() 通过数组或可迭代集合创建一个 Observable

    • Observable.range()/Observable.rangeLong() 通过一个区间创建产生整数或 long 类型的 Observable

    • Observable.empty() 不产生数据,只调用 onCompelte()

    • Observable.error() 只调用 onError()

    • Observable.never() 不调用 Observer 的任何方法,一般用于测试

    • Observable.create() 更加灵活和基础的创建方法,可以根据需要调用 Observer 的方法

    • 懒发射

      • 以下两个创建方法中,参数提供的操作只会在订阅后才开始执行

      • Observable.fromCallable() 参数提供的函数返回要发射的数据

      • Observable.defer() 参数提供的函数返回一个 Observable(当 Observable 本身的创建也很耗时的时候,用 defer() 比较合适)

Observer 观察者,接收 Observable 发出的数据。当 onComplete()和 onError 回调后,就不会再产生事件

  • 事件回调

    • onSubscribe(Disposable d) 订阅成功后回调,Disposable 可以用来取消订阅

    • onNext(T value) 每当当 Observable 产生一个事件,都会回调

    • onComplete() Observable 不再产生数据时回调

    • onError(Throwable e) Observable 内部出错时回调

Operator

对数据进行一系列转换

  • 支持链式调用

  • 提高可读性

  • 让开发者更关注做什么而不是怎么做

  • onNext() 接收到的数据最好是最终数据,转换都应该放在 Operator 中

Operators

不要在 Operator 中修改原数据

  • 变换与过滤

    • map:将一个类型转换成另外一种类型

    • filter:将不符合条件的数据过滤掉

  • FlatMap

    • 传入的函数需要返回一个 Observable

    • 返回的 Observable 会立刻被订阅,并且产生的数据将被合并发送到 Observer

    • 如果有延迟,flatMap 产生的序列 可能和 Observable 的产生顺序不一致

  • ConcatMap

    • 与 FlatMap 类似,不过会等前一个 Observable 产生完数据,才会订阅下一个,因此不会出现数据交叉的情况

  • Skip:忽略前面 n 个数据

  • Take:只取前面 n 个数据

  • Fisrt&Last:只取第一个或最后一个数据 返回一个特殊的 Observable:Single(只产生一个数据)last 等所有数据产生完成后才会收到数据​

  • 组合

    • Observable.merge():将多个 Observable 组合成一个,数据可能会交叉产生,当其中一个 Observable 调用 onComplete()后,合并后的 Observable 就不再产生数据

    • Observable.concat():多个 Observable 按顺序订阅,前一个产生完数据后才会订阅后一个,因此产生数据不会交叉,所有 Observable 都完成后才会结束

    • Observable.zip():将多个 Observable 的数据按产生顺序组合成一个新的 Observable 序列,新 Observable 序列的长度等于包含数据最少的 Observable 的长度 如果一个 Observable 产生数据过快 ,那么会缓存产生过快的 Observable 数据并等待较慢的那个

  • 聚合

    • toList():将一个 Observable 的数据打包成一个 List,并返回 Observable<List<T>> 调用 onComplete()后才会产生 toList(),如果没有调用onComplete ,那么 toList 也不会被调用

    • reduce():将元素累积,形成一个最终结果 比如累加。不能用于将数据累积到一个可变的数据结构中

    • collect():用于将元素累积到一个集合中,需要传一个 Callable 用于创建一个集合,然后再把数据累积到集合中 比如将字符拼接到一个 StringBuilder

  • 工具性 Operator

    • doOnEach():每产生一个事件就会被回调(包括 onError onComplete)

    • donOnNext() doOnError() doOnComplete()

  • Cache:用于缓存Observable 产生的事件 可以配合 timestamp() 来为缓存事件加上时间戳,用于判断是否需要重新产生事件

  • 重用操作链

    • 将一系列操作链封装到ObservableTransformer 中,然后通过 Observable.compose()方法传入

多线程

Scheduler 的使用

  • RxJava 默认是同步的

  • Schedulers

    • Schedulers.newThread():新建一个线程

    • Schedulers.single():单线程,任务按顺序执行

    • Schedulers.from(Executor):基于提供的 Executor

    • Schedulers.computation():用于 CPU 密集型任务,使用等于 CPU 核数的线程(因为创建多个线程没有意义,反而会消耗性能)

    • Schedulers.trampoline():在当前线程将任务组织成队列执行

  • Observable.subscribeOn():控制 Observable 的创建过程在哪个 Scheduler 执行

  • Observable.observeOn():控制观察者的行为在哪个 Scheduler 执行 RxAndroid 提供了 Android 平台的线程模型,比如 AnroidSchduler.mainThread()

  • 细节

    • 链式调用中,只有第一个 subscribeOn 会生效,后续的都不会起作用 如果不想调用者修改Observable 的创建执行线程,可以先指定线程

    • observeOn 可以被调用多次,每次都会影响后面的操作 应该确保 observeOn 尽可能靠近 subscribe 方法,以确保计算操作尽肯能少的出现在主线程

  • 使用 FlatMap 进行并发:在创建 Observable 的时候通过 subscribeOn 指定执行线程 不能保证顺序

Reactive Modeling on Android

  • 非响应式方法 RxJava 提供的用来将 Observable 转换成非响应式的方法,应尽量避免使用

    • blockingFirst():阻塞调用线程,直到 Observable 产生第一个元素

    • blockingLast():阻塞调用线程,直到 Observable 产生最后一个元素

    • blockingNext():返回一个 Iterable 对象,可以用来遍历 Observable 产生的所有数据,每遍历一个数据都会阻塞调用线程

    • blockingSingle():阻塞调用线程,直到 Observable 产生一个元素以及一个后续的 onComplete 事件,否则会抛出异常

    • blockingSubscribe():阻塞调用线程,直到 Observable 产生一个终止事件(onNext 被忽略)

  • 懒加载 耗时任务应该在订阅后才开始执行

    • 使用 defer 或者 fromCallable 来包装创建 Observable 的操作即可

  • Reactive Everything

    • 处理耗时操作 网络请求、文件读写、Bitmap 处理等

      • 将耗时操作包装到 Observable 中,并制定执行线程,这样调用端就不用关心该操作是否会阻塞主线程

      • Completable、Single 和 Maybe 几种特殊的 Observable

        • Single:只产生一个元素

          • 对应 SingleObserver 只有三个回调:onSubscribe(Disposable d) 、onSuccess(T value) 和 onError(Throwable t)

        • Completable:不产生元素,只通知操作是否成功

          • 对应 CompletableObserver 回调:onSubscribe(Disposable d) 、onComplete() 和 onError(Throwable t)

        • Maybe:可能产生一个元素,也可能没有元素,可以看做 Single 和 Completable 的组合

          • 对应 MaybeObserver 回调:

            • onSubscribe(Disposable d)

            • onSuccess(T value)

            • onComplete():没有产生元素时回调

            • onError(Throwable t)

        • 与 Observable 相互转换

          • Completable、Single 和 Maybe 有 toObservable 方法

          • Observable 的 toList 方法可以转换成 Single

    • 替换回调 通过 Observable.create()方法,创建 Observable 并调用观察者的一系列方法来更新数据,替换常用的回调形式编码

      • 多点传播

        • Cold Observable to Hot Observable create() 方法返回的是一个 Cold Observable(新的观察者订阅后,会重新创建数据源并收到已产生的所有事件),有时需要将Cold Observable 转为 Hot Observable(多个观察者共享数据源,新观察者只收到自订阅后产生的数据)

          • 调用 .publish() 可以将一个 Observable 转换成 ConnectableObservable,即可转换成 Hot Observable 只调用.publish() ,观察者的 onNext() 方法并不会调用,还需要调用 .connect()方法,ConnectableObservable 才开始产生数据。refCount()方法保证只要有观察者订阅,​ConnectableObservable 就回保持与数据源的连接

          • 调用 .share() 方法也可以将一个 Cold Observable 转换成 Hot Observable,相当于调用 .publish().refCount()

        • Subjects 既可以当作数据源,又可以当做观察者。通过它也可以实现共享数据源的多点传播。

          • PublishSubject:观察者只能收到订阅后产生的数据

          • AsyncSubject:观察者只能收到 Observable 完成后的最后一个数据

          • BehaviorSubject:观察者只能收到最近产生的一个数据以及其后的所有数据

          • ReplaySubject:观察者能收到 Observable 产生过的所有旧数据以及之后产生的新数据

    • View 事件

      • 使用 RxBinding 可以将 View 的事件组织成 Observable

    • Disposable 和 Activity/Fragment 的生命周期 用于取消订阅,如果没有在合适的时机取消订阅,可能会产生内存泄漏

      • 不需要时就可以取消订阅,比如在 Activity 的 onDestroy() onPause()方法中

      • 使用 RxLiftCycle 库,可以在收到 Activity 或 Fragment 指定生命周期回调时自动取消订阅

      • 使用 Google 的 LifeCycle 库也可以

Backpressure(背压)

解决数据产生速度比消费速度快的问题。如果生产者速度过快,则减慢其速度。

  • Flowable 支持背压的 Observable

    • 支持的背压策略

      • BackpressureStrategy.ERROR:当下游观察者跟不上数据产生 速度时,抛出一个 MissingBackpressureException

      • BackpressureStrategy.BUFFER(默认策略):在下游观察者处理前,缓存已经产生但是未处理的数据(默认缓存数量为 128)

      • BackpressureStrategy.DROP:丢弃最近产生的数据

      • BackpressureStrategy.LATEST:只保留最新产生的数据

      • BackpressureStrategy.MISSING:无背压,等同于直接使用 Observable

    • 什么时候用 Observable

      • 数据量小,不太可能出现 OOM

      • 处理 GUI 事件,不会很频繁

    • 什么时候用 Flowable 需要控制数据获取量

      • 数据量很大,可以控制产生的数据量

      • 文件读写:控制读取量

      • 数据库

      • 网络流

  • Subscriber 比 Observer 多一个 onSubscribe(Subcribtion s)方法,Subcribtion 有一个 request(long)方法可以用来向数据源请求数据

  • 限流

    • throttleFirst(long,timeunit) 、throttleFirst(long,timeunit):获取一段时间内的第一个或者最后一个事件

    • sample(long,timeunit):获取一段时间内最后一个事件(采样)

  • 缓存

    • buffer:将一些数据缓存到 list

      • .buffer(int count): 缓存一定数量的数据

      • .buffer(ObservableSource boundary):以 boundary 产生的数据为边界缓存数据

      • .buffer(long timeSpan,TimeUnit unit):缓存一定时间内的数据

    • window:将一些数据缓存成 Observable 便于链式操作甚至并发(类似于 flatMap)

Error Handling

  • 产生一个异常

    • observer.onError()

    • Obserable.error()

  • Observer 的 onError() 回调 这是一个终止事件,一旦产生错误,Observable 将不会产生新数据

  • 严重的异常(如 OOM、VMError)不会传递给观察者,而是在异常产生线程直接抛出

  • 异常处理操作

    • Observable.onErrorReturnItem(T item):发生异常时不调用 onError,而是返回该方法提供的 item

    • Observable.onErrorReturn(Function valueSupplier):发生异常时,返回 valueSupplier 提供的 item 用于根据不同错误返回对应的 item

    • Observable.onErrorResumeNext(ObservableSource next):发生异常时,由另一个 Observable 继续提供数据

  • 重试操作 重试条件不满足后,继续调用Observer 的 onError

    • Observable.retry():发生异常时重新订阅,让 Observable 继续完成数据产生 可以设置重试次数以及重试条件

    • Observable.retryWhen() 可以设置每次重试的时机。例如指数延迟,下一次重试的等待时间是本次的2倍

  • 处理未传递异常 异常发生时,没有提供您 onError 实现或者订阅关系已取消

    • 未处理异常会调用 RxJavaPlugin.onError()默认打出堆栈信息

    • 可以通过 RxJavaPlugins.setErrorHandler() 设置未传递异常处理函数,这样就不会调用RxJavaPlugin.onError()(默认会 crash) 这种方式可以避免 UI 线程的崩溃,便于记录日志等

上一页RxJava下一页源码解析

最后更新于5年前

这有帮助吗?

():主要用于 IO 操作,线程数会随着任务适当增加

Schedulers.io