Okio源码分析
概述
Okio 作为 Okhttp 底层 io 库,它补充了 java.io 和 java.nio 的不⾜,使访问、存储和处理数据更加容易。Okio 的特点如下:
okio 是⼀个由 square 公司开发的开源库,它弥补了 Java.io 和 java.nio 的不⾜,能够更⽅便快速的读取、存储和处理数据。
okio 有⾃⼰的流类型 Source 和 Sink,对应于 java.io 的 InputStream 和 OutputStream。
okio 内部引⼊了 ByteString 和 Buffer,提升了效率和性能。
okio 引⼊了超时机制。
okio 规模不⼤,代码精巧,是源码学习的好素材
强烈建议⼤家阅读 okio 的⽂档说明:。本⽂代码介绍基于版本 1.17.4。
流(Stream)
是指在计算机的输⼊输出操作中各部件之间的数据流动。按照数据的传输⽅向,流可分为输⼊流与输出流。Java语⾔⾥的流序列中的数据既可以是未经加⼯的原始⼆进制数据,也可以是经过⼀定编码处理后符合某种特定格式的数据。
1.输⼊输出流
在Java中,把不同类型的输⼊输出源抽象为流,其中输⼊和输出的数据称为数据流(Data Stream)。数据流是Java程序发送和接收数据的⼀个通道,数据流中包括输⼊流(Input Stream)和输出流(Output Stream)。通常应⽤程序中使⽤输⼊流读出数据,输出流写⼊数据。流式输⼊、输出的特点是数据的获取和发送均沿数据序列顺序进⾏。相对于程序来说,输出流是往存储介质或数据通道写⼊数据,⽽输⼊流是从存储介质或数据通道中读取数据,⼀般来说关于流的特性有下⾯⼏点:
先进先出,最先写⼊输出流的数据最先被输⼊流读取到。
顺序存取,可以⼀个接⼀个地往流中写⼊⼀串字节,读出时也将按写⼊顺序读取⼀串字节,不能随机访问中间的数据。
只读或只写,每个流只能是输⼊流或输出流的⼀种,不能同时具备两个功能,在⼀个数据传输通道中,如果既要写⼊数据,⼜要读取数据,则要分别提供两个流。
2.缓冲流
为了提⾼数据的传输效率,引⼊了缓冲流(Buffered Stream)的概念,即为⼀个流配备⼀个缓冲区(Buffer),⼀个缓冲区就是专门⽤于传送数据的⼀块内存。
当向⼀个缓冲流写⼊数据时,系统将数据发送到缓冲区,⽽不是直接发送到外部设备。缓冲区⾃动记录数据,当缓冲区满时,系统将数据全部发送到相应的外部设备。当从⼀个缓冲流中读取数据时,系统实际是从缓冲区中读取数据,当缓冲区为空时,系统就会从相关外部设备⾃动读取数据,并读取尽可能多的数据填满缓冲区。使⽤数据流来处理输⼊输出的⽬的是使程序的输⼊输出操作独⽴于相关设备,由于程序不需关注具体设备实现的细节(具体细节由系统处理),所以对于各种输⼊输出设备,只要针对流做处理即可,不需修改源程序,从⽽增强了程序的可移植性。
Okio 关键类介绍
ByteStrings and Buffers
Okio 是围绕这两种类型构建的,它们将⼤量功能打包到⼀个简单的 API 中:
ByteString 是不可变的字节序列。对于字符数据,最基本的就是String。⽽ByteString 就像是String 的兄弟⼀般,它使得将⼆进制数据作为⼀个变量值变得容易。这
个类很聪明:它知道如何将⾃⼰编码和解码为⼗六进制、base64 和 utf-8。
Buffer 是⼀个可变的字节序列。像 Arraylist ⼀样,你不需要预先设置缓冲区的⼤⼩。你可以将缓冲区读写为⼀个队列:将数据写到队尾,然后从队头读取。
在内部,ByteString和Buffer做了⼀些聪明的事情来节省CPU和内存。如果您将UTF-8字符串编码为ByteString,它会缓存对该字符串的引⽤,这样,如果您稍后对其进⾏解码,就不需要做任何⼯作。
Buffer 是作为⽚段的链表实现的。当您将数据从⼀个缓冲区移动到另⼀个缓冲区时,它会重新分配⽚段的持有关系,⽽不是跨⽚段复制数据。这对多线程特别有⽤:与⽹络交互的⼦线程可以与⼯作线程交换数据,⽽⽆需任何复制或多余的操作。
Sources and Sinks
java.io 设计的⼀个优雅部分是如何对流进⾏分层来处理加密和压缩等转换。Okio 有⾃⼰的 stream 类型: Source 和Sink,分别类似于 java 的Inputstream 和Outputstream,但是有⼀些关键区别:
超时(Timeouts)。流提供了对底层 I/O 超时机制的访问。与java.io 的 socket 字流不同,read() 和write() ⽅法都给予超时机制。
易于实施。source 只声明了三个⽅法:read()、close() 和timeout()。没有像available()或单字节读取这样会导致正确性和性能意外的危险操作。
使⽤⽅便。虽然source 和sink 的实现只有三种⽅法可写,但是调⽤⽅可以实现Bufferedsource 和Bufferedsink 接⼝, 这两个接⼝提供了丰富API能够满⾜你所需的⼀
切。
字节流和字符流之间没有⼈为的区别。都是数据。你可以以字节、UTF-8 字符串、big-endian 的32位整数、little-endian 的短整数等任何你想要的形式进⾏读
写;不再有InputStreamReader!
易于测试。Buffer 类同时实现了BufferedSource 和BufferedSink 接⼝,即是 source 也是 sink,因此测试代码简单明了。
Sources 和 Sinks 分别与InputStream和OutputStream 交互操作。你可以将任何Source 看做InputStream,也可以将任何InputStream当做Source。对
于Sink 和Outputstream 也是如此。
Segment
Segment在 Okio 中作为数据缓冲的载体,⼀个 Segment 的数据缓冲⼤⼩为 8192,即 8k。每⼀个 Segment 都有前驱和后继结点,也就是说 Sement 是⼀个双向链表链表,准确的来说是⼀个双向循环链表。读取数据从 Segment 头结点读取,写数据从 Segment 尾结点写。
Okio 中引⼊池的概念也就是源码中SegmentPool的实现。SegmentPool 负责 Segment 创建和销毁,SegmentPool 最⼤可以缓存 8 个 Segment。
SegmentPool 是⼀个静态⽅法,因此也就是全局缓存只有 64 kb;
整体设计
前⾯说了介绍了很多关键的类,下⾯看下 Okio 的整体设计:
图⽚摘⾃
通过类图来看,整体设计是很简单明了的,可以结合前⾯介绍的关键类,这样你会更加理解这个设计图。
Okio 读写流程
在介绍 Okio 的读写流程的时候,还是得提⼀下⼀个关键的类:Okio。
Okio 类是⼯具类,内部提供了很多静态⽅法,⽅便⼤家调⽤,减少⼤家写了很多重复的代码,使得整个调⽤变得更加简单。
读⽂本⽂件
public void readLines(File file) throws IOException {
Source fileSource = Okio.source(file);
BufferedSource bufferedSource = Okio.buffer(fileSource);
for (String line; (line = adUtf8Line()) != null; ) {
System.out.println(line);
}
bufferedSource.close();
}
这个⽰例代码是⽤来读取⽂本⽂件的,Okio 通过Okio.source(File)的⽅式来读取⽂件流,它返回的是⼀个 Source 对象,但是 Source 对象的⽅法是⽐较少的(只有3个),因此Okio 提供了⼀个装饰者对象接⼝BufferedSource,通过Okio.buffer(fileSource) 来⽣成,这个⽅法内部实际会⽣成⼀个RealBufferedSource 类对象,RealBufferedSource 内部持有Buffer缓冲对象可使 IO 速度更快,该类实现了BufferedSource接⼝,⽽BufferedSource 接⼝提供了⼤量丰富的接⼝⽅法:
可以看到,⼏乎你想从输⼊流中读取任何的数据类型都可以,⽽不需要你⾃⼰去转换,可以说是⾮常强⼤⽽且⼈性化了,除了 read ⽅法以外,还有⼀些别的⽅法,可以说⼏乎可以满⾜很多需求。
在上⾯的⽰例代码中,打开输⼊流对象的⽅法需要负责关闭对象资源,调⽤ close ⽅法,Okio 官⽅推荐使⽤ java 的try-with-source 语法,上⾯⽰例代码可以写成下⾯这样:
public void readLines(File file) throws IOException {
try (BufferedSource bufferedSource = Okio.buffer(Okio.source(file))) {
for (String line; (line = adUtf8Line()) != null; ) {
System.out.println(line);
}
}
}
try-with-source 是 jdk1.4 开始提供的语法糖,在try 语句 () ⾥⾯的资源对象,jdk 最终会⾃动调⽤它的close ⽅法去关闭它, 即便try ⾥有多个资源对象也是可以的,这样就不⽤你⼿动去关闭资源了。但是在 android ⾥⾯使⽤的话,会提⽰你要求 API level 最低为19 才可以。
写⽂本⽂件
public void writeEnv(File file) throws IOException {
try (BufferedSink sink = Okio.buffer(Okio.sink(file))) {
sink.writeUtf8("啊啊啊")
.writeUtf8("=")
.writeUtf8("aaa")
.writeUtf8("\n");
}
}
其中Okio.buffer(fileSink) 内部返回的实现对象是⼀个RealBufferedSink 类的对象,跟RealBufferedSource⼀样它也是⼀个装饰者对象,具备Buffer 缓冲功能。
类似于读⽂件使⽤Source 和BufferedSource,写⽂件的话,则是使⽤的Sink和BufferedSink,同样的在BufferedSink 接⼝中也提供了丰富的接⼝⽅法,这⾥就不展开了,具体可以查看代码。
源码分析
通过上⾯的介绍,⼤家对 Okio 的读取有了⼀个基本的了解。下⾯开始进⼊源码分析,深⼊去研究其实现,再介绍源码的时候,会先对⼀些接⼝做⼀些简单的介绍。
Source & Sink
public interface Sink extends Closeable, Flushable {
/** Removes {@code byteCount} bytes from {@code source} and appends them to this. */  从 source 中获取到的数据添加到 sink ⾃⾝
void write(Buffer source, long byteCount) throws IOException;
/** Pushes all buffered bytes to their final destination. */
@Override void flush() throws IOException;
/** Returns the timeout for this sink. */
Timeout timeout();
@Override void close() throws IOException;
}
public interface Source extends Closeable {
/**
* Removes at least 1, and up to {@code byteCount} bytes from this and appends
* them to {@code sink}. Returns the number of bytes read, or -1 if this
* source is exhausted.  将⾃⾝数据给 sink
*/
long read(Buffer sink, long byteCount) throws IOException;
/** Returns the timeout for this source. */
Timeout timeout();
*/
@Override void close() throws IOException;
}
这两个是 Okio 中最基本的两个接⼝,分别对应 java 的InputStream 和OutputStream 即输⼊流和输出流,Source 是输⼊流,Sink 是输出流。接⼝提供的⽅法也是⾮常简单,⼤家⼀看就知道这⼏个⽅法的⽬的。
BufferedSink & BufferedSource
上⾯ Source和 Sink 提供了极简的接⼝,接着作者对这两个接⼝进⾏丰富的扩展。具体接⼝⽅法上⽂已介绍,这⾥也不在展开。
这⾥简单提⼀点,这种设计风格是值得我们去学习的,设计接⼝的时候要简单,专⼀。然后可以再新建⼀个接⼝,去丰富扩展其功能。这样使⽤者可以选择⾃⼰想要的接⼝来进
⾏实现。
RealBufferedSource & RealBufferedSink
在我们通过Okio.source() 和Okio.sink() 获取了 Souce 和 Sink 对象后,⼀般不会直接使⽤,⽽是会再调⽤⼀次Okio.buffer() ⽣成⼀个实现BufferedSource 和BufferedSink 接⼝的对象:
/**
* Returns a new source that buffers reads from {@code source}. The returned
* source will perform bulk reads into its in-memory buffer. Use this wherever
* you read a source to get an ergonomic and efficient access to data.
*/
public static BufferedSource buffer(Source source) {
return new RealBufferedSource(source);
}
/**
* Returns a new sink that buffers writes to {@code sink}. The returned sink
* will batch writes to {@code sink}. Use this wherever you write to a sink to
* get an ergonomic and efficient access to data.
*/
public static BufferedSink buffer(Sink sink) {
return new RealBufferedSink(sink);
}
内部分别返回的是RealBufferedSource 和RealBufferedSink 对象,他们分别实现了BufferedSource 和BufferedSink接⼝,⽽这两个接⼝则是分别继承了Source 和Sink 接⼝的并基础上进⾏
了⽅法扩展,提供了丰富的读写接⼝⽅法,⼏乎可以对各种基础数据类型进⾏读写。
Segment 及 SegmentPool
Segment 是Okio 中⾮常重要的⼀环,它可以说是Buffer 中数据的载体。容量是 8kb,头结点为 head。
final class Segment {
//Segment的容量,最⼤为8kb
static final int SIZE = 8192;
//如果Segment中字节数 > SHARE_MINIMUM时(⼤Segment),就可以共享,不能添加到SegmentPool
static final int SHARE_MINIMUM = 1024;
//存储的数据
final byte[] data;
//下⼀次读取的开始位置
int pos;
//写⼊的开始位置
int limit;
//当前Segment是否可以共享
boolean shared;
//data是否仅当前Segment独有,不share
boolean owner;
//后继节点
Segment next;
//前驱节点
Segment prev;
...
//移除当前Segment
public final @Nullable Segment pop() {
Segment result = next != this ? next : null;
< = next;
next.prev = prev;
next = null;
prev = null;
return result;
}
//在当前节点后添加⼀个新的节点
public final Segment push(Segment segment) {
segment.prev = this;
< = next;
next.prev = segment;
next = segment;
return segment;
}
//将当前Segment分裂成2个Segment结点。前⾯结点pos~limit数据范围是[pos..pos+byteCount),后⾯结点pos~limit数据范围是[pos+byteCount..limit)
public final Segment split(int byteCount) {
if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
Segment prefix;
//如果字节数⼤于SHARE_MINIMUM则拆分成共享节点
if (byteCount >= SHARE_MINIMUM) {
prefix = sharedCopy();
} else {
prefix = SegmentPool.take();
System.arraycopy(data, pos, prefix.data, 0, byteCount);
}
prefix.limit = prefix.pos + byteCount;
pos += byteCount;
prev.push(prefix);
return prefix;
}
//当前Segment结点和prev前驱结点合并成⼀个Segment,统⼀合并到prev,然后当前Segment结点从双向链表移除并添加到SegmentPool复⽤。当然合并的前提是:2个Segment的字节总和不超过8K。合并后可能会移动pos、limit public final void compact() {
if (prev == this) throw new IllegalStateException();
if (!prev.owner) return; // Cannot compact: prev isn't writable.
int byteCount = limit - pos;
int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
writeTo(prev, byteCount);
pop();
}
//从当前节点移动byteCount个字节到sink中
public final void writeTo(Segment sink, int byteCount) {
if (!sink.owner) throw new IllegalArgumentException();
if (sink.limit + byteCount > SIZE) {
// We can't fit byteCount bytes at the sink's current position. Shift sink first.
if (sink.shared) throw new IllegalArgumentException();
公司介绍源码
if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
sink.limit -= sink.pos;
sink.pos = 0;
}
System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
sink.limit += byteCount;
pos += byteCount;
}
}
SegmentPool 是⼀个Segment 池,内部维护了⼀个Segment 单向链表,容量为64kb(8 个Segment),回收不⽤的Segment 对象。
final class SegmentPool {
//SegmentPool的最⼤容量
static final long MAX_SIZE = 64 * 1024; // 64 KiB.
//后继节点
static Segment next;
//当前池内的总字节数
static long byteCount;
private SegmentPool() {
}
//从池中获取⼀个Segment对象
static Segment take() {
synchronized (SegmentPool.class) {
if (next != null) {
Segment result = next;
next = ;
< = null;
byteCount -= Segment.SIZE;
return result;
}
}
return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
}
//将Segment状态初始化并放⼊池中
static void recycle(Segment segment) {
if ( != null || segment.prev != null) throw new IllegalArgumentException();
if (segment.shared) return; // This segment cannot be recycled.
synchronized (SegmentPool.class) {
if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
byteCount += Segment.SIZE;
< = next;
segment.pos = segment.limit = 0;
next = segment;
}
}
}
SegmentPool 可以理解为⼀个缓存Segment的池,它只有两个⽅法,⼀个take(),⼀个recycle(),在SegmentPool 中维护的是⼀个Segment的单链表,并且它的最⼤值为MAX_SIZE = 64 * 1024 也就是64kb 即 8 个 Segment 的长度,next 就是单链表中的头结点。
take() ⽅法的作⽤是取出单链表的头结点 Segment 对象,然后将取出的对象与链表断开并将链表往后移动⼀个单位,如果是第⼀次调⽤ take, next 为 null, 则会直接 new ⼀个Segment 对象返回,并且这⾥创建的Segment是不共享的。
recycle() ⽅法的作⽤则是回收⼀个 Segment 对象,被回收的 Segment 对象将会被插⼊到SegmentPool 中的单链表的头部,以便后⾯继续复⽤,并且这⾥源码我们也可以看到如果是shared 的对象是不处理的,
如果是第⼀次调⽤recycle() ⽅法则链表会由空变为拥有⼀个节点的链表,每次回收就会插⼊⼀个到表头,直到超过最⼤容量。
Buffer
如果你只看Segment 的话还是很难理解整个数据的读写流程,因为你只知道它是能够形成⼀个链表的东西,但是当你看完Buffer 之后完整的流程就会清晰多了。
Buffer 类是 Okio 中最核⼼并且最丰富的类了,前⾯分析发现最终的 Source 和 Sink 实现对象中,都是通过该类完成读写操作,⽽ Buffer 类同时实现
了BufferedSource 和BufferedSink 接⼝,因此Buffer 具备 Okio 中的读和写的所有⽅法,所以这个类的⽅法超多!我们只⼀个读和写的⽅法来看⼀下实现好了。
写byte[]操作:
@Override
public Buffer write(byte[] source, int offset, int byteCount) {
if (source == null) throw new IllegalArgumentException("source == null");
// 检测参数的合法性
checkOffsetAndCount(source.length, offset, byteCount);
// 计算 source 要写⼊的最后⼀个字节的 index 值
int limit = offset + byteCount;
while (offset < limit) {
// 获取循环链表尾部的⼀个 Segment
Segment tail = writableSegment(1);
// 计算最多可写⼊的字节
int toCopy = Math.min(limit - offset, Segment.SIZE - tail.limit);
// 把 source 复制到 data 中
System.arraycopy(source, offset, tail.data, tail.limit, toCopy);
/
/ 调整写⼊的起始位置
offset += toCopy;
// 调整尾部Segment 的 limit 位置
tail.limit += toCopy;
}
// 调整 Buffer 的 size ⼤⼩
size += byteCount;
return this;
}
写操作内部是调⽤System.arraycopy 进⾏字节数组的复制,这⾥是写到 tail 对象,也就是循环链表的链尾Segment 对象当中,⽽且这⾥会不断循环的获取链尾Segment 对象进⾏写⼊。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。