EmbeddedChannel 概述
Netty 提供了它所谓的 Embedded
传输,用于测试 ChannelHandler 。这个传输是一种特殊的 Channel 实现 EmbeddedChannel
的功能,这个实现提供了通过 ChannelPipeline 传播事件的简便方法。
将入站数据或者出站数据写入到 EmbeddedChannel 中,然后检查是否有任何东西到达了 ChannelPipeline 的尾端。以这种方式,你便可以确定消息是否已经被编码或解码过了,以及是否触发了任何的 ChannelHandler 动作。
特殊的 EmbeddedChannel 方法:
writeInbound(Object…msgs)
:将入站消息写到 EmbeddedChannel 中。如果可以通过 readInbound() 方法从 EmbeddedChannel 中读取到数据,则返回truereadInbound()
:从 EmbeddedChannel 中读取一个入站消息。任何返回的东西都穿越了整个 ChannelPipeline。如果没有任何可供读取的,则返回 nullwriteOutbound(Object…msgs)
:将出站消息写到 EmbeddedChannel 中。如果现在可以通过 readOutbound ( ) 方法从EmbeddedChannel中读取到什么东西,则返回truereadOutbound()
:从 EmbeddedChannel 中读取一个出站消息。任何返回的东西都穿越了整个 ChannelPipeline。如果没有任何可供读取的,则返回 nullfinish()
:将 EmbeddedChannel 标记为完成,并且如果有可被读取的入站数据或者出站数据,则返回 true。这个方法还将会调用 EmbeddedChannel 上的 close() 方法
入站数据由 ChannelInboundHandler 处理,代表从远程节点读取的数据。出站数据由 ChannelOutboundHandler 处理,代表将要写到远程节点的数据。根据你要测试的 ChannelHandler ,你将使用 *Inbound()
或 *Outbound()
方法对。
你可以使用 writeOutbound()
方法将消息写到 Channel 中,并通过 ChannelPipeline 沿着出站的方向传递。随后,你可以使用 readOutbound()
方法来读取已被处理过的消息,以确定结果是否和预期一样。类似地,对于入站数据,你需要使用 writeInbound()
和 readInbound()
方法。
EmbeddedChannel 的数据流:
使用 EmbeddedChannel 测试 ChannelHandler
测试入站消息
下图展示了一个简单的 ByteToMessageDecoder
实现。给定足够的数据,这个实现将产生固定大小的帧。如果没有足够的数据可供读取,它将等待下一个数据块的到来,并将再次检查是否能够产生一个新的帧。
FixedLengthFrameDecoder :
// 扩展 ByteToMessageDecoder 以处理入站字节,并将它们解码为消息
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
private final int frameLength;
public FixedLengthFrameDecoder(int frameLength) {
if (frameLength <= 0) {
throw new IllegalArgumentException("frameLength must be a positive integer: " + frameLength);
}
this.frameLength = frameLength;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 检查是否有足够的字节可被读取,以生成下一个帧
while (in.readableBytes() >= frameLength) {
// 从 ByteBuf 中读取一个新帧
ByteBuf buf = in.readBytes(frameLength);
// 将该帧添加到已被解码的消息列表中
out.add(buf);
}
}
}
测试 FixedLengthFrameDecoder :
public class FixedLengthFrameDecoderTest {
@Test
public void testFramesDecoded() {
// 创建一个 ByteBuf , 并存储 9 个字节
ByteBuf buf = Unpooled.buffer();
for (int i = 0; i < 9; i++) {
buf.writeByte(i);
}
ByteBuf input = buf.duplicate();
// 创建一个 EmbeddedChannel ,并添加一个 FixedLengthFrameDecoder,其将以 3 字节的帧长度被测试
EmbeddedChannel channel = new EmbeddedChannel(new FixedLengthFrameDecoder(3));
// 将数据写入 EmbeddedChannel
Assert.isTrue(channel.writeInbound(input.retain()));
// 标记 Channel 为已完成状态
Assert.isTrue(channel.finish());
// 读取所生成的消息,并且验证是否有 3 帧(切片),其中每帧(切片)都为 3 字节
ByteBuf read = channel.readInbound();
Assert.isTrue(buf.readSlice(3).equals(read));
read.release();
read = channel.readInbound();
Assert.isTrue(buf.readSlice(3).equals(read));
read.release();
read = channel.readInbound();
Assert.isTrue(buf.readSlice(3).equals(read));
read.release();
Assert.isNull(channel.readInbound());
buf.release();
}
@Test
public void testFramesDecoded2() {
ByteBuf buf = Unpooled.buffer();
for (int i = 0; i < 9; i++) {
buf.writeByte(i);
}
ByteBuf input = buf.duplicate();
EmbeddedChannel channel = new EmbeddedChannel(new FixedLengthFrameDecoder(3));
// 返回 false ,因为没有一个完整的可供读取的帧
Assert.isTrue(!channel.writeInbound(input.readBytes(2)));
Assert.isTrue(channel.writeInbound(input.readBytes(7)));
Assert.isTrue(channel.finish());
ByteBuf read = channel.readInbound();
Assert.isTrue(buf.readSlice(3).equals(read));
read.release();
read = channel.readInbound();
Assert.isTrue(buf.readSlice(3).equals(read));
read.release();
read = channel.readInbound();
Assert.isTrue(buf.readSlice(3).equals(read));
read.release();
Assert.isNull(channel.readInbound());
buf.release();
}
}
测试出站消息
使用 EmbeddedChannel 测试一个编码器形式的 ChannelOutboundHandler ,编码器是一种将一种消息转换为另一种的组件。AbsIntegerEncoder 是 Netty 的 MessageToMessageEncoder 的一个特殊化的实现,用于将负值整数转换为绝对值。
下面的示例将会按照下列方式工作:
- 持有 AbsIntegerEncoder 的 EmbeddedChannel 将会以 4 字节的负整数的形式写出站数据。
- 编码器将从传入的 ByteBuf 中读取每个负整数,并将会调用 Math.abs() 方法来获取其绝对值。
- 编码器将会把每个负整数的绝对值写到 ChannelPipeline 中。
AbsIntegerEncoder :
// 扩展 MessageToMessageEncoder 以将一个消息编码为另外一种格式
public class AbsIntegerEncoder extends MessageToMessageEncoder<ByteBuf> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
// 检查是否有足够的字节来编码
while (in.readableBytes() >= 4) {
// 从输入的 ByteBuf 中读取下一个整数,并且计算其绝对值
int value = Math.abs(in.readInt());
// 将该整数写入到编码消息的 List 中
out.add(value);
}
}
}
测试 AbsIntegerEncoder :
public class AbsIntegerEncoderTest {
@Test
public void testEncoded() {
// 创建一个 ByteBuf ,并且写入 9 个负整数
ByteBuf buf = Unpooled.buffer();
for (int i = 1; i < 10; i++) {
buf.writeInt(i * -1);
}
// 创建一个 EmbeddedChannel ,并安装一个要测试的 AbsIntegerEncoder
EmbeddedChannel channel = new EmbeddedChannel(new AbsIntegerEncoder());
Assert.isTrue(channel.writeOutbound(buf));
// 将该 Channel 标记为已完成状态
Assert.isTrue(channel.finish());
// 读取所产生的消息,并断言它们包含了对应的绝对值
for (int i = 1; i < 10; i++) {
Assert.isTrue(channel.readOutbound().equals(i));
}
Assert.isNull(channel.readOutbound());
}
}
测试异常处理
应用程序通常需要执行比转换数据更加复杂的任务。例如,你可能需要处理格式不正确的输入或者过量的数据。下面示例中,如果所读取的字节数超出了某个特定的限制,我们将会抛出一个 TooLongFrameException
异常。这是一种经常用来防范资源被耗尽的方法。
下图中,最大的帧数大小已经被设置为 3 字节。如果一个帧的大小超出了该限制,那么程序将会丢弃它的字节,并抛出一个 TooLongFrameException
。位于 ChannelPipeline 中的其他 ChannelHandler 可以选择在 exceptionCaaught()
方法中处理该异常或者忽略它。
FrameChunkDecoder :
// 扩展 ByteToMessageDecoder 以将入站字节解码为消息
public class FrameChunkDecoder extends ByteToMessageDecoder {
// 将要产生的帧的最大允许大小
private final int maxFrameSize;
public FrameChunkDecoder(int maxFrameSize) {
this.maxFrameSize = maxFrameSize;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int readableBytes = in.readableBytes();
if (readableBytes > maxFrameSize) {
// 如果该帧太大,则丢弃它并抛出一个 TooLongFrameException 异常
in.clear();
throw new TooLongFrameException();
}
// 否则,从 ByteBuf 中读取一个新的帧
ByteBuf buf = in.readBytes(readableBytes);
// 将该帧添加到解码消息的 List 中
out.add(buf);
}
}
测试 FrameChunkDecoder :
public class FrameChunkDecoderTest {
@Test
public void testFramesDecoded() {
// 创建一个 ByteBuf ,并向它写入 9 字节
ByteBuf buf = Unpooled.buffer();
for (int i = 0; i < 9; i++) {
buf.writeByte(i);
}
ByteBuf input = buf.duplicate();
// 创建一个 EmbeddedChannel ,并向其安装一个帧大小为 3 字节的 FixedLengthFrameDecoder
EmbeddedChannel channel = new EmbeddedChannel(new FrameChunkDecoder(3));
// 向它写入 2 字节,并断言它们将会产生一个新帧
Assert.isTrue(channel.writeInbound(input.readBytes(2)));
try {
// 写入一个 4 字节大小的帧,并捕获预期的 TooLongFrameException
channel.writeInbound(input.readBytes(4));
// 如果上面没有抛出异常,那么就会到达这个断言,并且测试失败
org.junit.Assert.fail();
} catch (TooLongFrameException e) {
// expected exception
}
// 写入剩余的 2 字节,并断言将会产生一个有效帧
Assert.isTrue(channel.writeInbound(input.readBytes(3)));
// 将该 Channel 标记为已完成状态
Assert.isTrue(channel.finish());
// 读取产生的消息,并且验证值
ByteBuf read = channel.readInbound();
Assert.isTrue(buf.readSlice(2).equals(read));
read.release();
read = channel.readInbound();
Assert.isTrue(buf.skipBytes(4).readSlice(3).equals(read));
read.release();
buf.release();
}
}