网络数据的基本单位是字节,Java NIO 提供了 ByteBuffer
作为它的字节容器,但是这个类使用很频繁且过于复杂。所以 Netty 提供了 ByteBuf
作为字节容器,用于替代 ByteBuffer
的工作。
Netty 的数据处理 API 通过两个组件暴露 :ByteBuf
抽象类、ByteBufHolder
接口。
ByteBuf API的优点:
- 它可以被用户自定义的缓冲区类型扩展。
- 通过内置的复合缓冲区类型实现了透明的零拷贝。
- 容器可以按需增长(类似于 JDK 的 StringBuilder )。
- 在读和写这两种模式之间切换不需要调用 ByteBuffer 的 flip() 方法。
- 读和写使用了不同的索引。
- 支持方法的链式调用。
- 支持引用计数。
- 支持池化。
其他类可用于管理 ByteBuf 实例的分配,以及执行各种针对数据容器本身和它所持有的数据的操作。
ByteBuf 类
ByteBuf 维护了两个不同的索引:一个用于读取,一个用于写入。
当你从 ByteBuf 读取时,它的 readerIndex
将会被递增已经被读取的字节数。
当你写入 ByteBuf 时,它的 writerIndex
也会被递增。
名称以 read
或者 write
开头的 ByteBuf 方法,将会推进其对应的索引。而名称以 set
或者 get
开头的操作则不会。后面这些方法将在作为一个参数传入的一个相对索引上执行操作。
可以指定 ByteBuf 的最大容量,视图移动写索引(writerIndex)超过这个值将会触发一个异常。ByteBuf 默认的容量限制是 Integer.MAX_VALUE 。
ByteBuf 的使用模式
ByteBuf 提供了三种使用模式,分别是堆缓冲区、直接缓冲区、复合缓冲区。
堆缓冲区
最常用的 ByteBuf 模式是将数据存储在 JVM 的堆空间中。这种模式被称为支撑数组(blocking array),它能在没有使用池化的情况下提供快速的分配和释放。非常适合有遗留的数据需要处理的情况。
ByteBuf heapBuf = ...;
// 检查 ByteBuf 是否有一个支撑数组
if (heapBuf.hasArray()) {
// 如果有,则获取对该数组的引用
byte[] array = heapBuf.array();
// 计算第一个字节的偏移量
int offset = heapBuf.arrayOffset() + heapBuf.readerIndex();
// 获取可读字节数
int length = heapBuf.readableBytes();
// 使用数组、偏移量和长度作为参数调用你的方法
handleArray(array, offset, length);
}
当 hasArray()
方法返回 false
时,尝试访问支撑数组将触发一个 UnsupportedOperationException
异常。这个模式类似于 JDK 的 ByteBuffer 的用法。
直接缓冲区
直接缓冲区的内容将驻留在常规的会被垃圾回收的堆之外,相对于堆缓冲区,它们的分配和释放都较为昂贵。
ByteBuf directBuf = ...;
// 检查 ByteBuf 是否由数组支撑。如果不是,则这是一个直接缓冲区
if (!directBuf.hasArray()) {
// 获取可读的字节数
int length = directBuf.readableBytes();
// 分配一个新的数组来保存具有该长度的字节数据
byte[] array = new byte[length];
// 将字节复制到该数组
directBuf.getBytes(directBuf.readerIndex(), array);
// 使用数组、偏移量和长度作为参数调用你的方法
handleArray(array, 0, length);
}
复合缓冲区
复合缓冲区为多个 ByteBuf 提供一个聚合视图,可以根据需要添加或删除 ByteBuf 实例。Netty 通过一个 ByteBuf 子类 CompositeByteBuf
实现了这个模式,它提供了一个将多个缓冲区表示为单个合并缓冲区的虚拟表示。
注意:CompositeByteBuf 中的 ByteBuf 实例可能同时包含直接内存分配和非直接内存分配。如果其中只有一个实例,那么对 CompositeByteBuf 上的 hasArray()
方法的调用将返回该组件上的 hasArray()
方法的值,否则它将返回 false
。
为了举例说明,让我们考虑一下一个由头部和主体两部分组成的将通过 HTTP 协议 传输的消息。这两部分由应用程序的不同模块产生,将会在消息被发送的时候组装。该应用程序可以选择为多个消息重用相同的消息主体。当这种情况发生时,对于每个消息都将会创建一个新的头部。 因为我们不想为每个消息都重新分配这两个缓冲区,所以使用 CompositeByteBuf 是一个 完美的选择。它在消除了没必要的复制的同时,暴露了通用的 ByteBuf API。
使用 ByteBuffer 的复合缓冲区模式:
ByteBuffer[] message = new ByteBuffer[] { header, body };
ByteBuffer message2 = ByteBuffer.allocate(header.remaining() + body.remaining());
message2.put(header);
message2.put(body);
message2.flip();
使用 CompositeByteBuf 的复合缓冲区模式:
CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
ByteBuf headerBuf = ...;
ByteBuf bodyBuf = ...;
// 将 ByteBuf 实例追加到 CompositeByteBuf
messageBuf.addComponents(headerBuf, bodyBuf);
.....
// 删除位于索引位置 0 的 ByteBuf
messageBuf.removeComponent(0);
// 循环遍历所有的 ByteBuf 实例
for (ByteBuf buf : messageBuf) {
System.out.println(buf.toString());
}
访问 CompositeByteBuf 中的数据:
CompositeByteBuf compBuf = Unpooled.compositeBuffer();
// 获得可读字节数
int length = compBuf.readableBytes();
// 分配一个具有可读字节数长度的新数组
byte[] array = new byte[length];
// 将字节读到该数组中
compBuf.getBytes(compBuf.readerIndex(), array);
// 使用偏移量和长度作为参数使用该数组
handleArray(array, 0, array.length);
Netty 使用了 CompositeByteBuf 来优化套接字的 I/O 操作,尽可能地消除了由 JDK 的缓冲区实现所导致的性能以及内存使用率的问题。这种优化发生在 Netty 的核心代码中,不会暴露出来,但我们应该知道它带来的影响。
字节级操作
如同在普通的 Java 字节数组中一样, ByteBuf 的索引也是从 0 开始的:第一个字节的索引是 0
,最后一个字节的索引是总 capacity() - 1
。
遍历 ByteBuf :
ByteBuf buffer = ...;
for (int i = 0; i < buffer.capacity(); i++) {
byte b = buffer.getByte(i);
System.out.println((char)b);
}
虽然 ByteBuf 同时具有读索引和写索引,但是 JDK 的 ByteBuffer 却只有一个索引,这也就是为什么必须调用 flip()
方法来在读模式和写模式之间进行切换的原因。
可丢弃字节
ByteBuf 的内部分段可以分成三个部分:可丢弃字节、可读字节、可写字节。如下图所示:
已经被读过的字节,可以调用 discardReadBytes()
方法丢弃并回收空间。其实只是移动了可读的字节以及 writerIndex
,并没有对丢弃的部分进行擦除。
可读字节
ByteBuf 的可读字节分段存储了实际数据。新分配的、包装的或者复制的缓冲区的默认的 readerIndex
值为 0
。任何名称以 read
或者 skip
开头的操作都将会检索或者跳过位于当前 readerIndex
的数据,并且将它增加已读字节数。
如果被调用的方法需要一个 ByteBuf 参数作为写入的目标,并且没有指定目前索引参数,那么该目标缓冲区的 writerIndex
也将被增加,例如:readBytes(ByteBuf dest)
。
如果尝试在缓冲区的可读字节数已经耗尽时从中读取数据,那么将会引发一个 IndexOutofBoundsException
异常。合理读取数据:
ByteBuf buffer = ...;
while (buffer.isReadable()) {
System.out.println(buffer.readByte());
}
可写字节
可写字节分段是指一个拥有未定义内容的、写入就绪的内存区域。新分配的缓冲区的 writerIndex
的默认值为 0
。任何名称以 write
开头的操作都将从当前的 writerIndex
处开始写数据,并将它增加已经写入的字节数。
如果写操作的目标也是 ByteBuf ,并且没有指定源索引的值,则源缓冲区 readerIndex
也同样会被增加相同的大小,例如:writeBytes(ByteBuf dest)
。
如果尝试往目标写入超过目标容量的数据,将会引发一个 IndexOutOfBoundsException
异常。合理写入数据:
ByteBuf buffer = ...;
while (buffer.writableBytes() >= 4) {
buffer.writeInt(random.nextInt());
}
索引管理
JDK 的 InputStream 定义了 mark(int readlimit)
和 reset()
方法,这些方法分别被用来将流中的当前位置标记为指定的值,以及将流重置到该位置。
同样,可以通过调用 markReaderIndex()
、 markWriterIndex()
、 resetWriterIndex()
和 resetReaderIndex()
来标记和重置 ByteBuf 的 readerIndex
和 writerIndex
。
也可以通过调用 readerIndex(int)
或者 writerIndex(int)
来将索引移动到指定位置。试图将任何一个索引设置到一个无效的位置都将导致一个 IndexOutOfBoundsException
异常。
可以通过调用 clear()
方法来将 readerIndex
和 writerIndex
都设置为 0
。注意这并不会清除内存中的内容。调用 clear()
比调用 discardReadBytes()
轻量得多,因为它将只是重置索引而不会复制任何的内存。
查找操作
使用 indexOf()
方法来简单查找,使用带 ByteBufProcessor
接口的参数的方法来进行复杂查询。这个接口只定义了一个 boolean process(byte value)
方法,它将检查输入值是否是正在查找的值。
ByteBufProcessor 针对一些常见的值定义了许多便利的方法。假设你的应用程序需要和所谓的包含有以NULL结尾的内容的Flash套接字集成。调用 forEachByte(ByteBufProcessor.FIND_NUL)
将简单高效地消费该 Flash 数据,因为在处理期间只会执行较少的边界检查。
ByteBuf buffer = ...;
int index = buffer.forEachByte(ByteBufProcessor.FIND_CR);
ByteBufProcessor 在 Netty 4.1.x 开始已经废弃,请使用 io.netty.util.ByteProcessor 。
派生缓冲区
派生缓冲区为 ByteBuf 提供了以专门的方式来呈现其内容的视图,这类视图是通过以下方法被创建的:
duplicate()
slice()
slice(int, int)
Unpooled.unmodifiableBuffer(...)
order(ByteOrder)
readSlice(int)
这些方法都会返回一个新的 ByteBuf 实例,它具有自己的读索引、写索引和标记索引。其内部存储和 JDK 的 ByteBuffer 一样也是共享的。这使得派生缓冲区的创建成本是很低廉的,但是这也意味着,如果你修改了它的内容,也同时修改了其对应的源实例,所以要小心。
如果需要一个现有缓冲区的真实副本,请使用 copy()
或者 copy(int, int)
方法。不同于派生缓冲区,由这个调用所返回的 ByteBuf 拥有独立的数据副本。
对 ByteBuf 进行切片:
Charset utf8 = Charset.forName("UTF-8");
// 创建一个用于保存给定字符串的字节的 ByteBuf
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
// 创建该 ByteBuf 从索引 0 开始到索引 15 结束的一个新切片
ByteBuf sliced = buf.slice(0, 15);
// 将打印 Netty in Action
System.out.println(sliced.toString(utf8));
// 更新索引 0 处的字节
buf.setByte(0, (byte)'J');
// 断言将会成功,因为数据是共享的,对其中一个做的更改对另外一个也是可见的
assert buf.getByte(0) == sliced.getByte(0);
复制一个 ByteBuf:
Charset utf8 = Charset.forName("UTF-8");
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
// 创建该 ByteBuf 从索引 0 开始到索引 15 结束的分段的副本
ByteBuf copy = buf.copy(0, 15);
System.out.println(copy.toString(utf8));
buf.setByte(0, (byte) 'J');
// 断言将会成功,因为数据不是共享的
assert buf.getByte(0) != copy.getByte(0);
读/写操作
读/写操作分为两种类别:
get()
和set()
操作,从给定的索引开始,并且保持索引不变。read()
和write()
操作,从给定的索引开始,并且会根据已经访问过的字节数对索引进行调整。
最常用的 get() 方法:
getBoolean(int)
:返回给定索引处的Boolean值getByte(int)
:返回给定索引处的字节getUnsignedByte(int)
:返回给定索引处的无符号字节值作为 short 返回getMedium(int)
:返回指定的索引处的 24 位中等的 int 值getUnsignedMedium(int)
:返回给定索引处的无符号的 24 位的中等 int 值getInt(int)
:返回给定索引处的 int 值getUnsignedInt(int)
:返回给定索引处的无符号 int 值作为 long 返回getLong(int)
:返回给定索引值处的 long 值getShort(int)
:返回给定索引处的 short 值getUnsignedShort(int)
:返回给定索引处的无符号 short 值作为 int 返回getBytes(int, ...)
:将该缓冲区中从给定索引开始的数据传送到指定的目的地
大多数 get()
操作都有一个对应的 set()
方法,例如:
setBoolean(int index, boolean value)
:设定给定索引处的 Boolean 值setByte(int index, int value)
:设定给定索引处的字节值setMedium(int index, int value)
:设定给定索引处的 24 位的中等 int 值setInt(int index, int value)
:设定给定索引处的 int 值setLong(int index, long value)
:设定给定索引处的 long 值setShort(int index, int value)
:设定给定索引处的 short 值
get()
和 set()
方法的用法:
Charset utf8 = Charset.forName("UTF-8");
// 创建一个新的 ByteBuf 以保存给定字符串的字节
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
// 打印第一个字符 'N'
System.out.println((char)buf.getByte(0));
// 存储当前的 readerIndex 和 writerIndex
int readerIndex = buf.readerIndex();
int writerIndex = buf.writerIndex();
// 将索引 0 处的字节更新为字符 'B'
buf.setByte(0, (byte)'B');
// 打印第一个字符,现在是 'B'
System.out.println((char)buf.getByte(0));
// 断言将会成功,因为这些操作并不会修改相应的索引
assert readerIndex == buf.readerIndex();
assert writerIndex == buf.writerIndex();
现在,让我们研究一下 read()操作,其作用于当前的 readerIndex 或 writerIndex。 这些方法将用于从 ByteBuf 中读取数据,如同它是一个流。
readBoolean()
:返回当前 readerIndex 处的 Boolean,并将 readerIndex 增加 1readByte()
:返回当前 readerIndex 处的字节,并将 readerIndex 增加 1readUnsignedByte()
:将当前 readerIndex 处的无符号字节值作为 short 返回,并将 readerIndex 增加 1readMedium()
:返回当前 readerIndex 处的 24 位的中等 int 值,并将 readerIndex 增加3readUnsignedMedium()
:返回当前 readerIndex 处的 24 位的无符号的中等 int 值,并将 readerIndex 增加 3readInt()
:返回当前 readerIndex 的 int 值,并将 readerIndex 增加 4readUnsignedInt()
:将当前 readerIndex 处的无符号的 int 值作为 long 值返回,并将 readerIndex 增加 4readLong()
:返回当前 readerIndex 处的 long 值,并将 readerIndex 增加 8readShort()
:返回当前 readerIndex 处的 short 值,并将 readerIndex 增加 2readUnsignedShort()
:将当前 readerIndex 处的无符号 short 值作为 int 值返回,并将 readerIndex 增加 2readBytes(ByteBuf | byte[] destination, int dstIndex [,int length])
:将当前 ByteBuf 中从当前 readerIndex 处开始的(如果设置了, length 长度的字节)数据传送到一个目标 ByteBuf 或者 byte[],从 目标的 dstIndex 开始的位置。本地的 readerIndex 将被增加已经传 输的字节数
对应的 write() 方法:
writeBoolean(boolean)
:在当前writerIndex处写入一个Boolean,并将 writerIndex增加1-
writeByte(int)
:在当前 writerIndex 处写入一个字节值,并将 writerIndex 增加 1 -
writeMedium(int)
:在当前 writerIndex 处写入一个中等的 int 值,并将 writerIndex 增加3 -
writeInt(int)
:在当前 writerIndex 处写入一个 int 值,并将 writerIndex 增加 4 -
writeLong(long)
:在当前 writerIndex 处写入一个 long 值,并将 writerIndex 增加 8 -
writeShort(int)
:在当前 writerIndex 处写入一个 short 值,并将 writerIndex 增加 2 -
writeBytes(source ByteBuf |byte[] [,int srcIndex ,int length])
:从当前 writerIndex 开始,传输来自于指定源(ByteBuf 或者 byte[]) 的数据。如果提供了 srcIndex 和 length,则从 srcIndex 开始读取, 并且处理长度为 length 的字节。当前 writerIndex 将会被增加所写入 的字节数
ByteBuf 上的 read()
和 write()
操作:
Charset utf8 = Charset.forName("UTF-8");
// 创建一个新的 ByteBuf 以保存给定字符串的字节
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
// 打印第一个字符 'N'
System.out.println((char)buf.readByte());
// 存储当前的 readerIndex 和 writerIndex
int readerIndex = buf.readerIndex();
int writerIndex = buf.writerIndex();
// 将字符 '?' 追加到缓冲区
buf.writeByte((byte)'?');
assert readerIndex == buf.readerIndex();
// 断言将会成功,因为 writeByte() 方法移动了 writerIndex
assert writerIndex != buf.writerIndex();
更多的操作
ByteBuf 其他有用的操作:
isReadable()
:如果至少有一个字节可供读取,则返回 trueisWritable()
:如果至少有一个字节可被写入,则返回 truereadableBytes()
:返回可被读取的字节数writableBytes()
:返回可被写入的字节数capacity()
:返回 ByteBuf 可容纳的字节数。在此之后,它会尝试再次扩展直到达到 maxCapacity()maxCapacity()
:返回 ByteBuf 可以容纳的最大字节数hasArray()
:如果 ByteBuf 由一个字节数组支撑,则返回 truearray()
:如果 ByteBuf 由一个字节数组支撑则返回该数组;否则,它将抛出一个 UnsupportedOperationException 异常
ByteBufHolder 接口
我们经常发现,除了实际的数据负载之外,我们还需要存储各种属性值。HTTP 响应便是一个很好的例子,除了表示为字节的内容,还包括状态码、cookie 等。
为了处理这种常见的用例,Netty 提供了 ByteBufHolder
。ByteBufHolder 也为 Netty 的高级特性提供了支持,如缓冲区池化,其中可以从池中借用 ByteBuf ,并且在需要时自动释放。
ByteBufHolder 只有几种用于访问底层数据和引用计数的方法:
content()
:返回由这个 ByteBufHolder 所持有的 ByteBufcopy()
:返回这个 ByteBufHolder 的一个深拷贝,包括一个其所包含的 ByteBuf 的非共享拷贝duplicate()
:返回这个ByteBufHolder的一个浅拷贝,包括一个其所包含的 ByteBuf 的共享拷贝
ByteBuf 分配
按需分配:ByteBufAllocator
为了降低分配和释放内存的开销,Netty 通过 ByteBufAllocator 接口实现了 ByteBuf 的池化,它可以用来分配我们所描述过的任意类型的 ByteBuf 实例。使用池化是特定于应用程序的决定,其并不会以任何方式改变 ByteBuf API 的语义。
返回一个基于堆或者直接内存 存储的 ByteBuf :
buffer()
buffer(int initialCapacity);
buffer(int initialCapacity, int maxCapacity);
返回一个基于堆内存存储的 ByteBuf :
heapBuffer()
heapBuffer(int initialCapacity)
heapBuffer(int initialCapacity, int maxCapacity)
返回一个基于直接内存存储的 ByteBuf :
directBuffer()
directBuffer(int initialCapacity)
directBuffer(int initialCapacity, int maxCapacity)
返回一个可以通过添加最大到指定数目的基于堆的或者直接内存存储的缓冲区来扩展的 CompositeByteBuf :
compositeBuffer()
compositeBuffer(int maxNumComponents)
compositeDirectBuffer()
compositeDirectBuffer(int maxNumComponents);
compositeHeapBuffer()
compositeHeapBuffer(int maxNumComponents);
返回一个用于套接字的 I/O 操作的 ByteBuf :
ioBuffer()
可以通过 Channel 或者绑定到 ChannelHandler 的 ChannelHandlerContext 获取一个到 ByteBufAllocator 的引用:
Channel channel = ...;
ByteBufAllocator allocator = channel.alloc();
....
ChannelHandlerContext ctx = ...;
ByteBufAllocator allocator2 = ctx.alloc();
...
Netty 提供了两种 ByteBufAllocator 的实现:PooledByteBufAllocator
和 UnpooledByteBufAllocator
。前者池化了 ByteBuf 的实例以提高性能并最大限度地减少内存碎片。后者的实现不池化 ByteBuf 实例,并且在每次它被调用时都会返回一个新的实例。Netty 默认使用 PooledByteBufAllocator ,我们可以通过 ChannelConfig 或者在引导应用程序时指定一个不同的分配器来更改。
ByteBufUtil 类
ByteBufUtil 提供了用于操作 ByteBuf 的静态的辅助方法。因为这些 API 是通用的,并且和池化无关,所以这些方法已然在分配类的外部实现。
这些静态方法中最有价值的可能就是 hexdump()
方法,它以十六进制的表示形式打印 ByteBuf 的内容。这在各种情况下都很有用。例如,出于调试的目的记录 ByteBuf 的内容。十六进制的表示通常会提供一个比字节值的直接表示形式更加有用的日志条目,此外,十六进制的版本还可以很容易地转换回实际的字节表示。
引用计数
引用计数是一种通过在某个对象所持有的资源不再被其他对象引用时释放该对象所持有的资源来优化内存使用和性能的技术(iOS中的内存管理就是采用引用计数的方式,比如 ARC 自动引用计数和 MRC 手动引用计数)。
Netty 在 4.x 中为 ByteBuf 和 ByteBufHolder 引入了引用计数技术,它们都实现了 ReferenceCounted
接口。
引用计数背后的想法并不是特别的复杂,它主要涉及跟踪到某个特定对象的活动引用的数量。一个 ReferenceCounted
实现的实例将通常以活动的引用计数为 1 作为开始。只要引用计数大于 0,就能保证对象不会被释放。当活动引用的数量减少到 0 时,该实例就会被释放。注意, 虽然释放的确切语义可能是特定于实现的,但是至少已经释放的对象应该不可再用了。
引用计数对于池化实现(如 PooledByteBufAllocator )来说是至关重要的,它降低了内存分配的开销。
引用计数:
Channel channel = ...;
// 从 Channel 获取 ByteBufAllocator
ByteBufAllocator allocator = channel.alloc();
....
// 从ByteBufAllocator 分配一个 ByteBuf
ByteBuf buffer = allocator.directBuffer();
// 检查引用计数是否为预期的 1
assert buffer.refCnt() == 1;
释放引用计数的对象:
ByteBuf buffer = ...;
// 减少到该对象的活动引用。当减少到 0 时,该对象被释放,并且该方法返回 true
boolean released = buffer.release();
...
一般来说,是由最后访问(引用计数)对象的那一方来负责将它释放。试图访问一个已经被是否的引用计数的对象,将会导致一个 IllegalReferenceCountException
异常。