什么是编解码器
每个网络应用程序都必须定义如何解析在两个节点之间来回传输的原始字节,以及如何将其和目标应用程序的数据格式做相互转换。这种换换逻辑由编解码器处理,编解码器由编码器和解码器组成,它们每种都可以将字节流从一种格式转换为另一种格式。
如果将消息看作是对于特定的应用程序具有具体含义的结构化的字节序列,那么编码器是将消息转换为适合于传输的格式(最有可能是字节流)。而对于的解码器则是将网络字节流转换回应用程序的消息格式。因此,编码器操作出站数据,解码器处理入站数据。
解码器
解码器实现了 ChannelInboundHandler ,并将入站数据从一种格式转换为另一种格式。 Netty 提供了两个不同用例的解码器:
- 将字节解码为消息使用
ByteToMessageDecoder
和ReplayingDecoder
。 - 将一种消息类型解码为另一种使用
MessageToMessageDecoder
。
每当需要为 ChannelPipeline 中的下一个 ChannelInboundHandler 转换入站数据时会用到解码器,并且可以将多个解码器链接在一起,以实现任意复杂的转换逻辑。
抽象类 ByteToMessageDecoder
由于你不可能知道远程节点是否会一次性地发送一个完整的消息,所以这个类会对入站数据进行缓冲,直到它准备好处理。ByteToMessageDecoder
有两个最重要的方法:
decode(ChannelHandlerContext ctx, ByteBuf in, List out)
:这是你必须实现的唯一抽象方法。decode()
方法被调用时将会传入一个包含了传入数据的 ByteBuf ,以及一个用来添加解码消息的 List。对这个方法的调用将会重复进行,直到确定没有新的元素被添加到该 List,或者该 ByteBuf 中没有更多可读取的字节时为止。然后,如果该 List 不为空,那么它的内容将会被传递给 ChannelPipeline 中的下一个 ChannelInboundHandlerdecodeLast( ChannelHandlerContext ctx, ByteBuf in, List out)
:Netty 提供的这个默认实现只是简单地调用了decode()
方法。 当 Channel 的状态变为非活动时,这个方法将会被调用一次。可以重写该方法以提供特殊的处理
假设你接收了一个包含简单 int 的字节流,每个 int 都需要被单独处理。在这种情况下,你需要从入站 ByteBuf 中读取每个 int ,并将它传递给 ChannelPipeline 中的下一个 ChannelInboundHandler 。为了解码这个字节流,你要扩展 ByteToMessageDecoder 类。(原子类型的 int 在被添加到 List 中时,会被自动装箱为 Integer)
下图每次从入站 ByteBuf 中读取 4 字节,将其解码为一个 int ,然后将它添加到一个 List 中。当没有更多的元素可以被添加到该 List 中时,它的内容将会被发送给下一个 ChannelInboundHandler 。
ToIntegerDecoder 类扩展了 ByteToMessageDecoder :
public class ToIntegerDecoder extends ByteToMessageDecoder {
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 检查是否至少有 4 字节可读
if (in.readableBytes() >= 4) {
// 从入站 ByteBuf 中读取一个 int ,并将其添加到解码消息的 List 中
out.add(in.readInt());
}
}
}
抽象类 ReplayingDecoder
ReplayingDecoder
扩展了 ByteToMessageDecoder 类,使得我们不必调用 readableBytes()
方法。它通过使用一个自定义的 ByteBuf 实现,ReplayingDecoderByteBuf ,包装传入的 ByteBuf 实现了这一点,其将在内部调用 readableBytes() 方法。
这个类的完整声明是:
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
类型参数 S 指定了用于状态管理的类型,其中 Void 代表不需要状态管理。
ToIntegerDecoder2 类扩展了 ReplayingDecoder :
public class ToIntegerDecoder2 extends ReplayingDecoder<Void> {
/**
* @param in 传入的 ByteBuf 是 ReplayingDecoderByteBuf
*/
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 从入站 ByteBuf 中读取一个 int ,并将其添加到解码消息的 List 中
out.add(in.readInt());
}
}
ReplayingDecoder 虽然简化了操作,但是性能稍低于 ByteToMessageDecoder 。所以,如果解码器复杂,则使用 ReplayingDecoder 以简化操作,否则使用 ByteToMessageDecoder 以提高性能。
更多的解码器:
io.netty.handler.codec.LineBasedFrameDecoder
:这个类在 Netty 内部也有使用,它使用了行尾控制字符(\n
或者\r\n
)来解析消息数据io.netty.handler.codec.http.HttpObjectDecoder
:一个 HTTP 数据的解码器
在 io.netty.handler.codec
子包下面,有更多用于特定用例的编码器和解码器实现。
抽象类 MessageToMessageDecoder
MessageToMessageDecoder
用于两个消息格式之间进行转换(例如,从一种 POJO 类型转换为另一种):
public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter
类型 I
指定了 decode()
方法的输入参数 msg
的类型,它是你必须实现 的唯一方法。
decode(ChannelHandlerContext ctx, I msg, List out)
:对于每个需要被解码为另一种格式的入站消息来说,该方法都将会被调用。解码消息随后会被传递给 ChannelPipeline 中的下一个 ChannelInboundHandler
编写一个 IntegerToStringDecoder 解码器来扩展 MessageToMessageDecoder<Integer>
。它的 decode()
方法会把 Integer 参数转换为它的 String 表示,并有了下面的签名:
public void decode( ChannelHandlerContext ctx, Integer msg, List<Object> out ) throws Exception
IntegerToStringDecoder 类:
public class IntegerToStringDecoder extends MessageToMessageDecoder<Integer> {
@Override
public void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
out.add(String.valueOf(msg));
}
}
有关更加复杂的例子,请研究 io.netty.handler.codec.http.HttpObjectAggregator
类,它扩展了 MessageToMessageDecoder<HttpObject>
。
TooLongFrameException 类
由于 Netty 是一个异步框架,所以需要在字节可以解码之前在内存中缓冲它们。因此,不能让解码器缓冲大量的数据以至于耗尽可用的内存。为了解除这个常见的顾虑,Netty 提供了 TooLongFrameException
类,其将由解码器在帧超出指定的大小限制时抛出。
如果你正在使用一个可变帧大小的协议,那么这种波阿虎措施将是尤为重要的,下面是一个简单的例子:
public class SafeByteToMessageDecoder extends ByteToMessageDecoder {
private static final int MAX_FRAME_SIZE = 1024;
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int readable = in.readableBytes();
// 检查缓冲区是否有超过 MAX_FRAME_SIZE 个字节
if (readable > MAX_FRAME_SIZE) {
// 跳过所有的可读字节,抛出 TooLongFrameException 并通知 ChannelHandler
in.skipBytes(readable);
throw new TooLongFrameException("Frame too big!");
}
//...
}
}
编码器
编码器实现了 ChannelOutboundHandler ,并将出站数据从一种格式转换为另一种格式,和我们方才学习的解码器的功能正好相反。Netty 提供了一组类,用于帮助你编写具有以下功能的编码器:
- 将消息编码为字节
- 将消息编码为消息
抽象类 MessageToByteEncoder
MessageToByteEncoder 和 ByteToMessageDecoder 做的事情刚好相反,并且只有一个 encode()
方法:
encode(ChannelHandlerContext ctx, I msg, ByteBuf out)
:encode() 方法是你需要实现的唯一抽象方法。它被调用时将会传入要被该类编码为 ByteBuf 的(类型为 I 的)出站消息。该 ByteBuf 随后将会被转发给 ChannelPipeline 中的下一个 ChannelOutboundHandler
下面例子接受一个 Short 类型的实例作为消息,将它编码为 Short 的原子类型值,并将它写入 ByteBuf 中,其将随后被转发给 ChannelPipeline 中的下一个 ChannelOutboundHandler :
public class ShortToByteEncoder extends MessageToByteEncoder<Short> {
@Override
public void encode(ChannelHandlerContext ctx, Short msg, ByteBuf out) throws Exception {
// 将 Short 写入 ByteBuf 中
out.writeShort(msg);
}
}
Netty 提供了一些专门化的 MessageToByteEncoder ,你可以基于它们实现自己的编码器。 WebSocket08FrameEncoder
类提供了一个很好的实例。你可以在 io.netty.handler.codec.http.websocketx
包中找到它。
抽象类 MessageToMessageEncoder
MessageToMessageEncoder 和 MessageToMessageDecoder 只是数据方向相反,MessageToMessageEncoder 使用 encode() 方法将出站数据从一种消息转换为另一种消息:
encode(ChannelHandlerContext ctx, I msg, List<Object> out)
:这是你需要实现的唯一方法。每个通过write()
方法写入的消息都将会被传递给encode()
方法,以编码为一个或者多个出站消息。随后,这些出站消息将会被发给 ChannelPipeline 中的下一个 ChannelOutboundHandler
下面示例,编码器将每个出站 Integer 的 String 表示添加到该 List 中:
public class IntegerToStringEncoder extends MessageToMessageEncoder<Integer> {
@Override
public void encode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
// 将 Integer 转换为 String ,并将其添加到 List 中
out.add(String.valueOf(msg));
}
}
关于更多 MessageToMessageEncoder 的专业用法,请查看 io.netty.handler.codec.protobuf.ProtobufEncoder
类,它处理了由 Google 的 Protocol Buffers
规范所定义的数据格式。
抽象的编解码器类
虽然我们一直将解码器和编码器分开讨论,但是你有时会发现在同一个类中管理入站和出站数据和消息的转换是很有用的。Netty 的抽象编解码器类正好用于这个目的,因为它们每个都将捆绑一个解码器/编码器对,以处理这两种类型。这些抽象的编解码器类同时实现了 ChannelInboundHandler 和 ChannelOutboundHandler 接口,但是为了最大化代码的可重用和可扩展性,我们一般还是会分开使用编码器和解码器。
抽象类 ByteToMessageCodec
假如我们需要将字节解码为某种形式,可能是 POJO ,随后再次对它进行编码。ByteToMessageCodec
将为我们处理好这一切,因为它结合了 ByteToMessageDecoder 以及它的逆向 MessageToByteEncoder 。
任何的请求/响应协议都可以作为使用 ByteToMessageCodec 的理想选择。
ByteToMessageCodec API :
decode(ChannelHandlerContext ctx, ByteBuf in, List)
:只要有字节可以被消费,这个方法就将会被调用。它将入站 ByteBuf 转换为指定的消息格式,并将其转发给 ChannelPipeline 中的下一个 ChannelInboundHandlerdecodeLast(ChannelHandlerContext ctx, ByteBuf in, List out)
:这个方法的默认实现委托给了 decode() 方法。它只会在 Channel 的状态变为非活动时被调用一次。它可以被重写以实现特殊的处理encode(ChannelHandlerContext ctx, I msg, ByteBuf out)
:对于每个将被编码并写入出站 ByteBuf 的(类型为 I 的) 消息来说,这个方法都将会被调用
抽象类 MessageToMessageCodec
MessageToMessageCodec 是一个参数化的类,定义如下:
public abstract class MessageToMessageCodec<INBOUND_IN,OUTBOUND_IN>
MessageToMessageCodec API :
protected abstract decode(ChannelHandlerContext ctx, INBOUND_IN msg, List out)
:这个方法被调用时会被传入 INBOUND_IN 类型的消息。 它将把它们解码为 OUTBOUND_IN 类型的消息,这些消息将被转发给 ChannelPipeline 中的下一个 ChannelInboundHandlerprotected abstract encode(ChannelHandlerContext ctx, OUTBOUND_IN msg, List out)
:对于每个 OUTBOUND_IN 类型的消息,这个方法都将会被调用。这些消息将会被编码为 INBOUND_IN 类型的消息,然后被转发给 ChannelPipeline 中的下一个 ChannelOutboundHandler
这个编解码器主要用于在两种不同的消息 API 之间来回转换数据。
下面的示例是很常见的 MessageToMessageCodec 用例:
public class WebSocketConvertHandler extends MessageToMessageCodec<WebSocketFrame, WebSocketConvertHandler.MyWebSocketFrame> {
@Override
protected void encode(ChannelHandlerContext ctx, WebSocketConvertHandler.MyWebSocketFrame msg, List<Object> out) throws Exception {
ByteBuf payload = msg.getData().duplicate().retain();
switch (msg.getType()) {
case BINARY:
out.add(new BinaryWebSocketFrame(payload));
break;
case TEXT:
out.add(new TextWebSocketFrame(payload));
break;
case CLOSE:
out.add(new CloseWebSocketFrame(true, 0, payload));
break;
case CONTINUATION:
out.add(new ContinuationWebSocketFrame(payload));
break;
case PONG:
out.add(new PongWebSocketFrame(payload));
break;
case PING:
out.add(new PingWebSocketFrame(payload));
break;
default:
throw new IllegalStateException("Unsupported websocket msg " + msg);
}
}
@Override
protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception {
ByteBuf payload = msg.content().duplicate().retain();
if (msg instanceof BinaryWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.BINARY, payload));
} else if (msg instanceof CloseWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CLOSE, payload));
} else if (msg instanceof PingWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PING, payload));
} else if (msg instanceof PongWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PONG, payload));
} else if (msg instanceof TextWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.TEXT, payload));
} else if (msg instanceof ContinuationWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CONTINUATION, payload));
} else {
throw new IllegalStateException("Unsupported websocket msg " + msg);
}
}
// 声明 WebSocketConvertHandler 所使用的 OUTBOUND_IN 类型
public static final class MyWebSocketFrame {
// 定义拥有被包装的有效负载的 WebSocketFrame 的类型
public enum FrameType {
BINARY,
CLOSE,
PING,
PONG,
TEXT,
CONTINUATION
}
private final FrameType type;
private final ByteBuf data;
public MyWebSocketFrame(FrameType type, ByteBuf data) {
this.type = type;
this.data = data;
}
public FrameType getType() {
return type;
}
public ByteBuf getData() {
return data;
}
}
}
CombinedChannelDuplexHandler 类
CombinedChannelDuplexHandler
类充当了 ChannelInboundHandler 和 ChannelOutboundHandler 的容器。通过提供分别继承了解码器类和编码器类的类型,我们可以实现一个编解码器,而又不必直接扩展抽象的编解码器类。
字节流解码为 Char 类型:
public class ByteToCharDecoder extends ByteToMessageDecoder {
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
while (in.readableBytes() >= 2) {
out.add(in.readChar());
}
}
}
Char 类型编码为字节流:
public class CharToByteEncoder extends MessageToByteEncoder<Character> {
@Override
public void encode(ChannelHandlerContext ctx, Character msg, ByteBuf out) throws Exception {
out.writeChar(msg);
}
}
将上面解码器和编码器结合起来,构建一个编解码器:
public class CombinedByteCharCodec extends CombinedChannelDuplexHandler<ByteToCharDecoder, CharToByteEncoder> {
public CombinedByteCharCodec() {
super(new ByteToCharDecoder(), new CharToByteEncoder());
}
}
通过这种方式结合实现相对于十月编解码器类的方式来说可能更加简单也更加的灵活。