【Android】Okio 源码解析 : 一套精简高效的 I/O 库


Okio 源码解析

从前面的 OkHttp 源码解析中我们可以知道,OkHttp 中的 I/O 都不是通过我们平时所使用的 IOStream 来实现,而是使用了 Okio 这个第三方库,那它与寻常的 IOStream 有什么区别呢?让我们来分析一下它的源码。

Okio 中有两个非常重要的接口——Sink 以及 Source,它们都继承了 Closeable,其中 Sink 对应了我们原来所使用的 OutputStream,而 Source 则对应了我们原来所使用的 InputStream

Okio 的入口就是Okio 类,它是一个工厂类,可以通过它内部的一些 static 方法来创建 SinkSource 等对象。

Sink

Sink 实际上只是一个接口,让我们看看 Sink 中有哪些方法:

可以看到,它主要包含了 writeflushtimeoutclose 这几个方法,我们可以通过 Okio.sink 方法基于 OutputStream 获取一个 Sink

这里构建并实现了一个 Sink 的匿名内部类并返回,主要实现了它的 write 方法,剩余方法都是简单地转调到 OutputStream 的对应方法。

write 方法中,首先进行了一些状态检验,这里貌似在 Timeout 类中实现了对超时的处理,我们稍后再分析。之后从 Buffer 中获取了一个 Segment,并从中取出数据,计算出写入的量后将其写入 Sink 所对应的 OutputStream

Segment 采用了一种类似链表的形式进行连接,看来 Buffer 中维护了一个 Segment 链表,代表了数据的其中一段。这里将 Buffer 中的数据分段取出并写入了 OutputStream 中。

最后,通过 SegmentPool.recycle 方法对当前 Segment 进行回收。

从上面的代码中我们可以获取到如下信息:

  1. Buffer 其实就是内存中的一段数据的抽象,其中通过 Segment 以链表的形式保存用于存储数据。
  2. Segment 存储数据采用了分段的存储方式,因此获取数据时需要分段从 Segment 中获取数据。
  3. 有一个 SegmentPool 池用于实现 Segment 的复用。
  4. Segment 的使用有点类似链表。

Source

SourceSink 一样,也仅仅是一个接口:

Okio 中可以通过 source 方法根据 InputStream 创建一个 Source

这里构建并实现了 Source 的一个匿名内部类并返回,应该就是 Source 的默认实现了。

它除了 read 方法其他都只是简单地调用了 InputStream 的对应方法,我们重点看 read 方法:

首先它进行了一些相关的状态检测,之后通过 sink.writeableSegment 获取到了一个可以写入的 Segment。之后从 InputStream 中读取数据向 Segment 中写入,读取的大小被限制为了 8192 个字节。

Buffer

BufferSinkSource 中都担任了一个十分重要的地位,它对应了我们内存中存储的数据,对这些数据进行了抽象。下面让我们对 Buffer 进行分析:

Buffer 虽然是我们内存中数据的抽象,但数据实际上并不是存储在 Buffer 中的,它在内部维护了一个 Segment 的循环链表,Segment 才是真正存储数据的地方。它通过 Segment 将数据分成了几段,通过链表进行连接。在 Buffer 内部封装了许多 I/O 操作,都是在对 Segment 中的数据进行处理。

为什么要使用 Segment 对数据进行分段存储而不直接存储整个数据呢?由于数据是分段存放的,这些段中的某一部分可能与另一个 Buffer 中的数据恰好是相同的,此时就体现出了 Segment 的灵活性,我们不需要将数据拷贝到另一个 Buffer 中,只需要将其 Segment 指向这个重复段的 Segment 即可。同时,对于一些如将数据从 Source 转移到 Sink 中这种情况,也不需要进行拷贝,只需要将链表指向我们的 Segment 即可,极大地提高了效率,同时节省了内存空间。

Segment

我们首先看一下存储数据的 Segment,它代表了数据中的一段,是一个双向的循环链表,主要有以下的参数:

可以看到,其中 pos 代表了下一次读取的起始位置,而 limit 代表了下一次写入的起始位置,我们可以根据它们两个值将整个 Segment 的空间分为如图的三段:

image-20190921122845126

其中已读区域的数据我们以后都不会再用到,已写入区域的数据正在等待读取,而空闲区域还没有填入数据,可以进行写入。

共享机制

同时,Segment 还支持了对数据的共享,通过 sharedowner 字段分别表明了数据是否已被共享以及其是否属于当前 Segment。同时它提供了两种拷贝方式: sharedCopy 以及 unsharedCopy

unsharedCopy 返回了一个新的 Segment,并将 data 数组通过 clone 方法拷贝到了新 Segment 中:

sharedCopy 同样返回了一个新的 Segment,但其 data 数组是与新 Segment 进行共享的:

同时通过注释我们可以看到,当数据共享后,为了保证安全性,禁止了写入操作。同时将被拷贝的 Segment 也标记为了 shared,从而防止其被回收。

这样的设计同样是为了减少拷贝,从而提高 I/O 的效率。

合并与分割

Segment 还支持了与前一个 Segment 的合并以及对自身的分割操作,从而使得使用者能够更灵活地操作。

合并操作会在当前 Segment 与它的前一个节点都没有超过其大小的一半时,将二者的数据进行合并,并将当前 Segment 进行回收,从而增大内存的利用效率:

而分割操作则会将 Segment 中的数据分割为 [pos, pos+byteCount)[pos+byteCount, limit) 的两段:

这里首先对不同数据段的数据进行了处理,如果数据段大于了 1024 字节,则将数据通过共享交给了分割的前一个节点,两端 Segment 公用同一个 data 数组,否则通过拷贝的形式构建一个新的 Segment

为什么这里需要对数据大小的不同采用不同的处理方式呢?我们可以看到上面的注释,里面给出了答案:首先,为了避免拷贝数据带来的性能开销,加入了共享 Segment 的功能。但是由于共享的数据是只读的,如果有很多很短的数据段的话,使用的表现并不会很好,因此只有当拷贝的数据量比较大时,才会进行 Segment 的共享。

之后,将二者的 poslimit 都进行了设置。由于 pos 之前的部分及 limit 之后的部分都不会影响到我们正常的读取和写入,因此我们可以不用关心它们目前的状态,没必要再对它们进行一些如填充零之类的操作。

SegmentPool

同时,Okio 还使用了 SegmentPool 来实现一个对象池,从而避免 Segment 频繁地创建及销毁所带来的性能开销。

SegmentPool 的实现十分简单,它内部维护了一个单链表,用于存储被回收存在池中的 Segment,其最大容量被限制在了 64 k。

当需要 Segment 时,可以通过 take 方法来获取一个被回收的对象:

它会在单链表中找到一个空闲的 Segment 并初始化后返回。若当前链表中没有对象,则会创建一个新的 Segment

Segment 使用完毕时,首先可以通过 Segmentpop 操作将其从链表中移除,之后可以调用 SegmentPool.recycle 方法对其进行回收:

回收 Segment 时,不会对只读的 Segment 进行回收,若 Segment 个数超过了上限,则不会对该 Segment 进行回收。

数据转移

Okio 与 java.io 有个很大的不同,体现在 Buffer 的数据转移上,我们可以通过其 copyTo 方法来完成数据的转移。之所以叫转移,因为它相对于复制来说,是有很大的数据提升的。例如我们可以看到两个 Buffer 之间的数据转移是如何进行的:

从上面的代码中可以看出,实际上这个过程是通过了 Segment 共享实现的,因此不需要进行拷贝,极大地提高了数据转移的效率。

BufferedSource

我们可以通过 Okio.buffer 方法对一个普通的 Source 进行包装,获取一个具有缓冲能力的 BufferSource,它是一个接口,定义了一系列读取的方法:

它主要有两个实现类:BufferRealBufferedSource。其中 RealBufferedSource 显然是我们通过 buffer 方法包装后得到的类,而 Buffer 实际上对 BufferSource 也进行了实现,通过一系列 read 方法可以从 Segment 中读取处对应的数据。而我们的 RealBufferedSource 则是 Source 的一个包装类,并且其维护了一个 Buffer,从而提高 Input 的效率。我们先分析其思路,再来讨论为什么这样能提高 Input 的效率。

我们可以首先看到 RealBufferedSource 的读取方法,这里以 readByteArray 方法举例:

这里首先调用了 require 方法,之后再从 buffer 中将数据读出,看来在 require 中将数据先读取到了 buffer 中。

我们看到 require 方法:

它实际上转调到了 request 方法:

request 方法中,不断地向 buffer 中读取,每次读取 Segment.SIZE 也就是 8192 个字节。也就是它读取的量是 byteCount 以 8192 字节向上取整。为什么它不刚好读取 byteCount 个字节,要读满 8192 个字节呢?

这就是一种预取思想,因为 I/O 操作往往是非常频繁的,如果进行了一次读取,那就很有可能还会进行下一次读取,因此我们预先把它下一次可能读取的部分一起读取出来,这样下次读取时,就不需要再对系统进行请求以获取数据了,可以直接从我们的 buffer 中拿到。这就是为什么说加入 buffer 提高了我们 I/O 的效率。

可能还有人会问,为什么这样能提高 I/O 效率呢,不都是读了一样的量么?这个就涉及到一些操作系统的知识了。在现代的操作系统中,我们的程序往往运行在用户态,而用户态实际上是没有进行 I/O 的权限的,因此往往都是向操作系统发起请求,切换到内核态,再进行 I/O,完成后再次回到用户态。这样的用户态及内核态的切换实际上是非常耗时的,并且这个过程中也伴随着拷贝。因此采用上面的 buffer 可以有效地减少我们的这种系统 I/O 调用,加快我们的效率。

BufferedSink

我们同样可以通过 Okio.buffer 方法对一个普通的 Sink 进行包装,从而获取一个带有 buffer 缓冲能力的 BufferedSinkBufferedSink 也是一个接口 ,内部定义了一系列写入的方法:

BufferedSink 同样有两个实现类:BufferRealBufferedSink,我们可以先看到 RealBufferedSink,它是一个 Sink 的包装类,并且内部维护了一个 Buffer

write

我们先看看其写入方法:

这里拿了一个简单的写入 byte[] 的方法进行了举例,它首先将数据写入了 buffer 中,之后调用了 emitCompleteSegments 方法。可以看到这里并没有对 sink真正进行写入,那写入究竟是在哪里进行的呢?我们看看 emitCompleteSegments 方法中做了什么:

这里首先调用 buffer.completeSegmentByteCount 方法获取到了 buffer 中已写入但未被读取的部分字节数(只包括已经被写满了的 Segment 中的),之后调用 sink.write 将其写入到了 sink 中。

这里其实很奇怪,按道理来说 buffer 的作用是通过缓存来进行一些优化,但这个方法将数据写入 buffer 后,数据又立即被写入到了 sink 中。这样相比直接写入到 sink 中,反而会带来性能的损耗啊。这里为什么要这样做呢?

我看到这里时对这段也比较奇怪,但考虑到 Okio 的整体设计来说,应该是把 Buffer 当做了一个数据统一的中转站,将读写的优化统一放在了 Buffer 中进行,因此考虑到整体的一致性,将 RealBufferedSink 也采用了通过 Buffer 中转的方式编写,应该算是一种妥协吧。并且采用 Buffer 还有好处就是,一份数据既可以用于读也可以用于写。

flush

RealBufferedSink 还支持了 flush 操作,通过 flush 方法可以将缓冲区的所有数据写入 sink 中:

emit

RealBufferedSink 还具有 emit 功能,分别是 emitCompleteSegments 方法及 emit 方法,前者是将所有已填满的 Segment 中已写入未读取的数据写入 sink,后者则是将 buffer 中所有已写入未读取数据写入 sink(类似 flush):

Timeout 超时机制

Okio 中通过 Timeout 类实现了 SinkSource 的超时机制,在 Sink 的写入与 Source 的读取时对超时进行判断,如果超时则中断写入等操作。其中对于包装了普通 InputStream / OutputStream 的使用了普通的 Timeout,而对于对 Socket 进行了包装的则使用 AsyncTimeout

Timeout

我们先对普通的 Timeout 进行研究,Timeout 中主要有两个值,timeoutdeadline ,分别代表了 wait 的最大等待时间与完成某个工作的超时时间。

deadline

对于 deadline,我们可以通过 deadline 方法进行设定:

之后,在每个需要检查超时的地方需要调用该 TimeoutthrowIfReached 方法(如 Sinkwrite 方法):

这里很简单,就是进行时间的校验,若达到了设定的时间,则抛出异常从而中断后续操作。

timeout

同时,Timeout 还实现了对 monitor 进行 wait 的超时机制,通过 waitUntilNotified 方法可以等待 monitornotify,若等待的过程超过了 Timeout 所设定的时间或当前线程被中断,则会抛出异常,从而避免一直进行等待。并且,该方法需要在 synchronized 代码块中调用,以保证线程安全

在我们构造了一个 Timeout 后,可以使用 timeout 方法对其 wait 超时时间进行设定:

这里主要是将 timeoutNanos 设置为了对应的值。接着我们看到 waitUntilNotified 方法:

AsyncTimeout

AsyncTimeoutTimeout 的子类,接下来我们看看 AsyncTimeout 是如何对 Socket 中的超时进行处理的。

首先可以看到 AsyncTimeout 中保存了一个 head 及一个 next 引用,显然这里是有一个链表存储的 AsyncTimeout 队列的:

这里感觉与 MessageQueue 有点相似,猜测 AsyncTimeout 会根据超时的时间按序存储在队列中。

并且从 AsyncTimeout 的 JavaDoc 中可以看到,它需要使用者在异步的事件开始时调用 enter 方法,结束时调用 exit 方法。同时它在背后开辟了一个线程对超时进行定时检查。

enter & exit

让我们先看到 enter 方法:

上面主要是将其 inQueue 设置为了 true,之后调用 scheduleTimeout 方法对超时进行定时检查。我们暂时先不关注 scheduleTimeout 的具体实现。

接着我们看到 exit 方法:

这里也非常简单,就是将 inQueue 设置为了 false,并调用 cancelScheduledTimeout 方法停止前面的定时校验线程。

scheduleTimeout

我们接下来看看这个定时校验的具体实现,我们先看到 scheduleTimeout 方法:

上面的逻辑主要分为以下几步:

  1. 若队列中还没有节点,构造一个头节点并且启动 WatchdogWatchdog 是一个 Thread 的子类,也就是我们的定时扫描线程。
  2. 计算该 Timeout 的超时时间,取了 timeoutdeadline 的最小值
  3. 将该 timeout 按剩余时间从小到大的顺序插入队列中
  4. 若插入的位置是队列的头部,则进行 notify(这里还无法了解到意图,我们可以往后看看)

cancelScheduledTimeout

接着我们看看 cancelScheduledTimeout 做了些什么:

这里很简单,就是将该 AsyncTimeout 从队列中移除,若返回 true,则代表超时已经发生,若返回 false,则代表超时还未发生,该 Timeout 被移除。这个返回值同样反映到了我们的 exit 方法返回值中。

Watchdog

接着我们看看 Watchdog 究竟是如何对超时进行检测的:

Wachdog 中不断调用 awaitTimeout 方法尝试获取一个可以停止的 Timeout,之后调用了其 timeOut 方法通知外部已超时。

awaitTimeout

我们可以看看 awaitTimeout 做了什么:

这里主要有以下几步:

  1. 如果队列为空,wait 直到有新的节点加入队列
  2. 计算节点需要 wait 的时间并 wait 对应时间
  3. 时间到后,说明该节点超时,将其移出队列

通过这里的代码,我们就知道为什么前面在链表头部加入节点时需要进行一次 notify 了,主要有两个目的:

  1. 若队列中没有元素,可以通过 notify通知此处有新元素加入队列。
  2. 由于插入在头部,说明其比后面的节点的需要等待时间更少,因此需要停止前一次 wait 来计算该新的 Timeout 所需要的等待时间,并对其进行超时处理。

这里的处理和 Android 中 MessageQueue 的设计还是有异曲同工之妙的,我们可以学习一波。

sink & source

AsyncTimeout 还实现了 sinksource 方法来实现了支持 AsyncTimeout 超时机制的 SinkSource,主要是通过在其各种操作前后分别调用 enterexit。下面以 Sink 为例:

比较简单,这里就不做太多解释了。

总结

Okio 是一套基于 java.io 进行了一系列优化的十分优秀的 I/O 库,它通过引入了 Segment 机制大大降低了数据迁移的成本,减少了拷贝的次数,并且对 java.io 繁琐的体系进行了简化,使得整个库更易于使用。在 Okio 中还实现了很多有其它功能的 SourceSink,感兴趣的读者可以自行翻阅一下源码。同时各位可以去回顾一下前面的 OkHttp 源码解析中,OkHttp 是如何使用 Okio 进行 Socket 的数据写入及读取的。


Android Developer in GDUT