Android IO 框架 Okio 的实现原理,到底哪里 OK?

藏宝库编辑 2024-9-27 16:31:20 45 0 来自 中国
前言

各人好,我是小彭。
本日,我们来讨论一个 Square 开源的 I/O 框架 Okio,我们最开始打仗到 Okio 框架照旧源于 Square 家的 OkHttp 网络框架。那么,OkHttp 为什么要使用 Okio,它相比于 Java 原生 IO 有什么区别和上风?本日我们就围绕这些标题睁开。
本文源码基于 Okio v3.2.0。
头脑导图
1.png 1. 说一下 Okio 的上风?

相比于 Java 原生 IO 框架,我以为 Okio 的上风紧张体如今 3 个方面:

  • 1、精简且全面的 API: 原生 IO 使用装饰模式,比方使用 BufferedInputStream 装饰 FileInputStream 文件输入流,可以加强流的缓冲功能。但是原生 IO 的装饰器过于巨大,须要区分字节、字符流、字节数组、字符数组、缓冲等多种装饰器,而这些恰恰又是最常用的基础装饰器。相较之下,Okio 直接在 BufferedSource 和 BufferedSink 中聚合了原生 IO 中全部基础的装饰器,使得框架更加精简;
  • 2、基于共享的缓冲区计划: 由于 IO 体系调用存在上下文切换的性能斲丧,为了镌汰体系调用次数,应用层每每会接纳缓冲区计谋。但是缓冲区又会存在副作用,当数据从一个缓冲区转移到另一个缓冲区时须要拷贝数据,这种内存中的拷贝显得没有须要。而 Okio 接纳了基于共享的缓冲区计划,在缓冲区间转移数据只是共享 Segment 的引用,而镌汰了内存拷贝。同时 Segment 也接纳了对象池计划,镌汰了内存分配和回收的开销;
  • 3、超机遇制: Okio 增补了部门 IO 操纵不支持超时检测的缺陷,而且 Okio 不但支持单次 IO 操纵的超时检测,还支持包含多次 IO 操纵的复合使命超时检测。
下面,我们将从这三个上风睁开分析:
2. 精简的 Okio 框架

先用一个表格总结 Okio 框架中紧张的范例:
范例形貌Source输入流Sink输出流BufferedSource缓存输入流接口,实现类是 RealBufferedSourceBufferedSink缓冲输出流接口,实现类是 RealBufferedSinkBuffer缓冲区,由 Segment 链表组成Segment数据片断,多个片断组成逻辑上一连数据ByteStringString 类Timeout超时控制2.1 Source 输入流 与 Sink 输出流

在 Java 原生 IO 中有四个基础接口,分别是:

  • 字节流: InputStream 输入流和 OutputStream 输出流;
  • 字符流: Reader 输入流和 Writer 输出流。
而在 Okio 更加精简,只有两个基础接口,分别是:

  • 流: Source 输入流和 Sink 输出流。
Source.kt
interface Source : Closeable {    // 从输入流读取数据到 Buffer 中(Buffer 等价于 byte[] 字节数组)    // 返回值:-1:输入内容结束    @Throws(IOException::class)    fun read(sink: Buffer, byteCount: Long): Long    // 超时控制(具体分析见后续文章)    fun timeout(): Timeout    // 关闭流    @Throws(IOException::class)    override fun close()}Sink.java
actual interface Sink : Closeable, Flushable {    // 将 Buffer 的数据写入到输出流中(Buffer 等价于 byte[] 字节数组)    @Throws(IOException::class)    actual fun write(source: Buffer, byteCount: Long)    // 清空输出缓冲区    @Throws(IOException::class)    actual override fun flush()    // 超时控制(具体分析见后续文章)    actual fun timeout(): Timeout    // 关闭流    @Throws(IOException::class)    actual override fun close()}2.2 InputStream / OutputStream 与 Source / Sink 互转

在功能上,InputStream - Source 和 OutputStream - Sink 分别是等价的,而且是相互兼容的。结合 Kotlin 扩展函数,两种接口之间的转换会非常方便:

  • source(): InputStream 转 Source,实现类是 InputStreamSource;
  • sink(): OutputStream 转 Sink,实现类是 OutputStreamSink;
比力不明白的是: Okio 没有提供 InputStreamSource 和 OutputStreamSink 转回 InputStream 和 OutputStream 的方法,而是须要先转换为 BufferSource 与 BufferSink,再转回 InputStream 和 OutputStream。

  • buffer(): Source 转 BufferedSource,Sink 转 BufferedSink,实现类分别是 RealBufferedSource 和 RealBufferedSink。
示例代码
// 原生 IO -> Okioval source = FileInputStream(File("")).source()val bufferSource = FileInputStream(File("")).source().buffer()val sink = FileOutputStream(File("")).sink()val bufferSink = FileOutputStream(File("")).sink().buffer()// Okio -> 原生 IOval inputStream = bufferSource.inputStream()val outputStream = bufferSink.outputStream()JvmOkio.kt
// InputStream -> Sourcefun InputStream.source(): Source = InputStreamSource(this, Timeout())// OutputStream -> Sinkfun OutputStream.sink(): Sink = OutputStreamSink(this, Timeout())private class InputStreamSource(    private val input: InputStream,    private val timeout: Timeout) : Source {    override fun read(sink: Buffer, byteCount: Long): Long {        if (byteCount == 0L) return 0        require(byteCount >= 0) { "byteCount < 0: $byteCount" }        try {            // 同步超时监控(具体分析见后续文章)            timeout.throwIfReached()            // 读入 Buffer            val tail = sink.writableSegment(1)            val maxToCopy = minOf(byteCount, Segment.SIZE - tail.limit).toInt()            val bytesRead = input.read(tail.data, tail.limit, maxToCopy)            if (bytesRead == -1) {                if (tail.pos == tail.limit) {                    // We allocated a tail segment, but didn't end up needing it. Recycle!                    sink.head = tail.pop()                    SegmentPool.recycle(tail)                }                return -1            }            tail.limit += bytesRead            sink.size += bytesRead            return bytesRead.toLong()        } catch (e: AssertionError) {            if (e.isAndroidGetsocknameError) throw IOException(e)            throw e        }  }  override fun close() = input.close()  override fun timeout() = timeout  override fun toString() = "source($input)"}private class OutputStreamSink(    private val out: OutputStream,    private val timeout: Timeout) : Sink {    override fun write(source: Buffer, byteCount: Long) {        checkOffsetAndCount(source.size, 0, byteCount)        var remaining = byteCount        // 写出 Buffer        while (remaining > 0) {            // 同步超时监控(具体分析见后续文章)            timeout.throwIfReached()            // 取有用数据量和剩余输出量的较小值            val head = source.head!!            val toCopy = minOf(remaining, head.limit - head.pos).toInt()            out.write(head.data, head.pos, toCopy)            head.pos += toCopy            remaining -= toCopy            source.size -= toCopy            // 指向下一个 Segment            if (head.pos == head.limit) {                source.head = head.pop()                SegmentPool.recycle(head)            }        }    }    override fun flush() = out.flush()    override fun close() = out.close()    override fun timeout() = timeout    override fun toString() = "sink($out)"}Okio.kt
// Source -> BufferedSourcefun Source.buffer(): BufferedSource = RealBufferedSource(this)// Sink -> BufferedSinkfun Sink.buffer(): BufferedSink = RealBufferedSink(this)2.3 BufferSource 与 BufferSink

在 Java 原生 IO 中,为了镌汰体系调用次数,我们一样平常不会直接调用 InputStream 和 OutputStream,而是会使用 BufferedInputStream 和 BufferedOutputStream 包装类增长缓冲功能。
比方,我们希望接纳带缓冲的方式读取字符格式的文件,则须要先将文件输入流包装为字符流,再包装为缓冲流:
Java 原生 IO 示例
// 第一层包装FileInputStream fis = new FileInputStream(file);// 第二层包装InputStreamReader isr = new InputStreamReader(new FileInputStream(file), "UTF-8");// 第三层包装BufferedReader br = new BufferedReader(isr);String line;while ((line = br.readLine()) != null) {    ...}// 省略 close同理,我们在 Okio 中一样平常也不会直接调用 Source 和 Sink,而是会使用 BufferedSource 和 BufferedSink 包装类增长缓冲功能:
Okio 示例
val bufferedSource = file.source()/*第一层包装*/.buffer()/*第二层包装*/while (!bufferedSource.exhausted()) {    val line = bufferedSource.readUtf8Line();    ...}// 省略 close网上有资料说 Okio 没有使用装饰器模式,以是类结构更简朴。 这么说实在不太准确,装饰器模式本身并不是缺点,而且从 BufferedSource 和 BufferSink 可以看出 Okio 也使用了装饰器模式。 严格来说是原生 IO 的装饰器过于巨大,而 Okio 的装饰器更加精简。
比如原生 IO 常用的流就有这么多:

  • 原始流: FileInputStream / FileOutputStream 与 SocketInputStream / SocketOutputStream;
  • 基础接口(区分字节流和字符流): InputStream / OutputStream 与 Reader / Writer;
  • 缓存流: BufferedInputStream / BufferedOutputStream 与 BufferedReader / BufferedWriter;
  • 根本范例: DataInputStream / DataOutputStream;
  • 字节数组和字符数组: ByteArrayInputStream / ByteArrayOutputStream 与 CharArrayReader / CharArrayWriter;
  • 此处省略一万个字。
原生 IO 框架
而这么多种流在 Okio 里还剩下多少呢?

  • 原始流: FileInputStream / FileOutputStream 与 SocketInputStream / SocketOutputStream;
  • 基础接口: Source / Sink;
  • 缓存流: BufferedSource / BufferedSink。
Okio 框架
3.png 就问你服不平?
而且你看哈,这些都是平常业务开辟中最常见的根本范例,原生 IO 把它们都拆分开了,让标题复杂化了。反观 Okio 直接在 BufferedSource 和 BufferedSink 中聚合了原生 IO 中根本的功能,而不再须要区分字节、字符、字节数组、字符数组、基础范例等等装饰器,确实让框架更加精简。
BufferedSource.kt
actual interface BufferedSource : Source, ReadableByteChannel {    actual val buffer: Buffer    // 读取 Int    @Throws(IOException::class)    actual fun readInt(): Int    // 读取 String    @Throws(IOException::class)    fun readString(charset: Charset): String    ...    fun inputStream(): InputStream}BufferedSink.kt
actual interface BufferedSink : Sink, WritableByteChannel {    actual val buffer: Buffer    // 写入 Int    @Throws(IOException::class)    actual fun writeInt(i: Int): BufferedSink    // 写入 String    @Throws(IOException::class)    fun writeString(string: String, charset: Charset): BufferedSink    ...    fun outputStream(): OutputStream}2.4 RealBufferedSink 与 RealBufferedSource

BufferedSource 和 BufferedSink 照旧接口,它们的真正的实现类是 RealBufferedSource 和 RealBufferedSink。可以看到,在实现类中会创建一个 Buffer 缓冲区,在输入和输出的时间,都会借助 “Buffer 缓冲区” 镌汰体系调用次数。
RealBufferedSource.kt
internal actual class RealBufferedSource actual constructor(    // 装饰器模式    @JvmField actual val source: Source) : BufferedSource {    // 创建输入缓冲区    @JvmField val bufferField = Buffer()    // 带缓冲地读取(全部数据)    override fun readString(charset: Charset): String {        buffer.writeAll(source)        return buffer.readString(charset)    }    // 带缓冲地读取(byteCount)    override fun readString(byteCount: Long, charset: Charset): String {        require(byteCount)        return buffer.readString(byteCount, charset)    }}RealBufferedSink.kt
internal actual class RealBufferedSink actual constructor(    // 装饰器模式    @JvmField actual val sink: Sink) : BufferedSink {    // 创建输出缓冲区    @JvmField val bufferField = Buffer()    // 带缓冲地写入(全部数据)    override fun writeString(string: String, charset: Charset): BufferedSink {        buffer.writeString(string, charset)        return emitCompleteSegments()    }    // 带缓冲地写入(beginIndex - endIndex)    override fun writeString(        string: String,        beginIndex: Int,        endIndex: Int,        charset: Charset    ): BufferedSink {        buffer.writeString(string, beginIndex, endIndex, charset)        return emitCompleteSegments()    }}至此,Okio 根本框架分析结束,用一张图总结:
Okio 框架
4.png 3. Okio 的缓冲区计划

3.1 使用缓冲区镌汰体系调用次数

在操纵体系中,访问磁盘和网卡等 IO 操纵须要通过体系调用来实行。体系调用本质上是一种软制止,历程会从用户态陷入内核态实行制止处理处罚步调,完成 IO 操纵后再从内核态切换回用户态。
可以看到,体系调用存在上下文切换的性能斲丧。为了镌汰体系调用次数,应用层每每会接纳缓冲区计谋:
以 Java 原生 IO BufferedInputStream 为例,会通过一个 byte[] 数组作为数据源的输入缓冲,每次读取数据时会读取更多数据到缓冲区中:

  • 如果缓冲区中存在有用数据,则直接从缓冲区数据读取;
  • 如果缓冲区不存在有用数据,则先实行体系调用添补缓冲区(fill),再从缓冲区读取数据;
  • 如果要读取的数据量大于缓冲区容量,就会跳过缓冲区直接实行体系调用。
输出流 BufferedOutputStream 也类似,输出数据时会优先写到缓冲区,当缓冲区满或者手动调用 flush() 时,再实行体系调用写出数据。
伪代码
// 1\. 输入fun read(byte[] dst, int len) : Int {    // 缓冲区有用数据量    int avail = count - pos    if(avail <= 0) {        if(len >= 缓冲区容量) {            // 直接从输入流读取            read(输入流 in, dst, len)        }        // 添补缓冲区        fill(数据源 in, 缓冲区)    }    // 本次读取数据量,不凌驾可用容量    int cnt = (avail < len) ? avail : len?    read(缓冲区, dst, cnt)    // 更新缓冲区索引    pos += cnt    return cnt}// 2\. 输出fun write(byte[] src, len) {    if(len > 缓冲区容量) {        // 先将缓冲区写出        flush(缓冲区)        // 直接写出数据        write(输出流 out, src, len)    }    // 缓冲区剩余容量    int left = 缓冲区容量 - count    if(len > 缓冲区剩余容量) {        // 先将缓冲区写出        flush(缓冲区)    }    // 将数据写入缓冲区    write(缓冲区, src, len)    // 更新缓冲区已添加数据容量    count += len}3.2 缓冲区的副作用

简直,缓冲区计谋能有用地镌汰体系调用次数,不至于读取一个字节都须要实行一次体系调用,大多数情况下体现良好。 但思量一种 “双流操纵” 场景,即从一个输入流读取,再写入到一个输出流。回顾刚才讲的缓存计谋,此时的数据转移过程为:

  • 1、从输入流读取到缓冲区;
  • 2、从输入流缓冲区拷贝到 byte[](拷贝)
  • 3、将 byte[] copy 到输出流缓冲区(拷贝);
  • 4、将输出流缓冲区写入到输出流。
如果这两个流都使用了缓冲区计划,那么数据在这两个内存缓冲区之间相互拷贝,就显得没有须要。
3.3 Okio 的 Buffer 缓冲区

Okio 固然也有缓冲区计谋,如果没有就会存在频仍体系调用的标题。
Buffer 是 RealBufferedSource 和 RealBufferedSink 的数据缓冲区。固然在实现上与原生 BufferedInputStream 和 BufferedOutputStream 不一样,但在功能上是一样的。区别在于:

  • 1、BufferedInputStream 中的缓冲区是 “一个固定长度的字节数组” ,数据从一个缓冲区转移到另一个缓冲区须要拷贝;
  • 2、Buffer 中的缓冲区是 “一个 Segment 双向循环链表” ,每个 Segment 对象是一小段字节数组,依靠 Segment 链表的序次组成逻辑上的一连数据。这个 Segment 片断是 Okio 高效的关键。
Buffer.kt
actual class Buffer : BufferedSource, BufferedSink, Cloneable, ByteChannel {    // 缓冲区(Segment 双向链表)    @JvmField internal actual var head: Segment? = null    // 缓冲区数据量    @get:JvmName("size")    actual var size: Long = 0L        internal set    override fun buffer() = this    actual override val buffer get() = this}对比 BufferedInputStream:
BufferedInputStream.java
public class BufferedInputStream extends FilterInputStream {    // 缓冲区的默认大小(8KB)    private static int DEFAULT_BUFFER_SIZE = 8192;    // 输入缓冲区(固定长度的数组)    protected volatile byte buf[];    // 有用数据起始位,也是读数据的起始位    protected int pos;    // 有用数据量,pos + count 是写数据的起始位    protected int count;    ...}3.4 Segment 片断与 SegmentPool 对象池

Segment 中的字节数组是可以 “共享” 的,当数据从一个缓冲区转移到另一个缓冲区时,可以共享数据引用,而不肯定须要拷贝数据。
Segment.kt
internal class Segment {    companion object {        // 片断的默认大小(8KB)        const val SIZE = 8192        // 最小共享阈值,凌驾 1KB 的数据才会共享        const val SHARE_MINIMUM = 1024    }    // 底层数组    @JvmField val data: ByteArra    // 有用数据的起始位,也是读数据的起始位    @JvmField var pos: Int = 0    // 有用数据的结束位,也是写数据的起始位    @JvmField var limit: Int = 0    // 共享标志位    @JvmField var shared: Boolean = false    // 宿主标志位    @JvmField var owner: Boolean = false    // 后续指针    @JvmField var next: Segment? = null    // 前驱指针    @JvmField var prev: Segment? = null    constructor() {        // 默认构造 8KB 数组(为什么默认长度是 8KB)        this.data = ByteArray(SIZE)        // 宿主标志位        this.owner = true        // 共享标志位        this.shared = false    }}别的,Segment 还使用了对象池计划,被回收的 Segment 对象会缓存在 SegmentPool 中。SegmentPool 内部维护了一个被回收的 Segment 对象单链表,缓存容量的最大值是 MAX_SIZE = 64 * 1024,也就相称于 8 个默认 Segment 的长度:
SegmentPool.kt
// object:全局单例internal actual object SegmentPool {    // 缓存容量    actual val MAX_SIZE = 64 * 1024    // 头节点    private val LOCK = Segment(ByteArray(0), pos = 0, limit = 0, shared = false, owner = false)    ...}Segment 表现图
4. 总结


  • 1、Okio 将原生 IO 多种基础装饰器聚合在 BufferedSource 和 BufferedSink,使得框架更加精简;
  • 2、为了镌汰体系调用次数的同时,应用层 IO 框架会使用缓存区计划。而 Okio 使用了基于共享 Segment 的缓冲区计划,镌汰了在缓冲区间转移数据的内存拷贝;
  • 3、Okio 增补了部门 IO 操纵不支持超时检测的缺陷,而且 Okio 不但支持单次 IO 操纵的超时检测,还支持包含多次 IO 操纵的复合使命超时检测。
关于 Okio 超机遇制的具体分析,我们在 下一篇文章 里讨论。请关注。
作者:彭旭锐
链接:https://juejin.cn/post/7167757174502850597
您需要登录后才可以回帖 登录 | 立即注册

Powered by CangBaoKu v1.0 小黑屋藏宝库It社区( 冀ICP备14008649号 )

GMT+8, 2024-10-18 18:28, Processed in 0.184128 second(s), 35 queries.© 2003-2025 cbk Team.

快速回复 返回顶部 返回列表