编解码器-Netty笔记(十)

/ 0

什么是编解码器

每个网络应用程序都必须定义如何解析在两个节点之间来回传输的原始字节,以及如何将其和目标应用程序的数据格式做相互转换。这种换换逻辑由编解码器处理,编解码器由编码器解码器组成,它们每种都可以将字节流从一种格式转换为另一种格式。

如果将消息看作是对于特定的应用程序具有具体含义的结构化的字节序列,那么编码器是将消息转换为适合于传输的格式(最有可能是字节流)。而对于的解码器则是将网络字节流转换回应用程序的消息格式。因此,编码器操作出站数据,解码器处理入站数据

解码器

解码器实现了 ChannelInboundHandler ,并将入站数据从一种格式转换为另一种格式。 Netty 提供了两个不同用例的解码器:

每当需要为 ChannelPipeline 中的下一个 ChannelInboundHandler 转换入站数据时会用到解码器,并且可以将多个解码器链接在一起,以实现任意复杂的转换逻辑。

抽象类 ByteToMessageDecoder

由于你不可能知道远程节点是否会一次性地发送一个完整的消息,所以这个类会对入站数据进行缓冲,直到它准备好处理。ByteToMessageDecoder 有两个最重要的方法:

假设你接收了一个包含简单 int 的字节流,每个 int 都需要被单独处理。在这种情况下,你需要从入站 ByteBuf 中读取每个 int ,并将它传递给 ChannelPipeline 中的下一个 ChannelInboundHandler 。为了解码这个字节流,你要扩展 ByteToMessageDecoder 类。(原子类型的 int 在被添加到 List 中时,会被自动装箱为 Integer)

下图每次从入站 ByteBuf 中读取 4 字节,将其解码为一个 int ,然后将它添加到一个 List 中。当没有更多的元素可以被添加到该 List 中时,它的内容将会被发送给下一个 ChannelInboundHandler 。

ToIntegerDecoder 类扩展了 ByteToMessageDecoder :

public class ToIntegerDecoder extends ByteToMessageDecoder {

    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 检查是否至少有 4 字节可读
        if (in.readableBytes() >= 4) {
            // 从入站 ByteBuf 中读取一个 int ,并将其添加到解码消息的 List 中
            out.add(in.readInt());
        }
    }
}

抽象类 ReplayingDecoder

ReplayingDecoder 扩展了 ByteToMessageDecoder 类,使得我们不必调用 readableBytes() 方法。它通过使用一个自定义的 ByteBuf 实现,ReplayingDecoderByteBuf ,包装传入的 ByteBuf 实现了这一点,其将在内部调用 readableBytes() 方法。

这个类的完整声明是:

public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder

类型参数 S 指定了用于状态管理的类型,其中 Void 代表不需要状态管理。

ToIntegerDecoder2 类扩展了 ReplayingDecoder :

public class ToIntegerDecoder2 extends ReplayingDecoder<Void> {

    /**
     * @param in  传入的 ByteBuf 是 ReplayingDecoderByteBuf
     */
    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 从入站 ByteBuf 中读取一个 int ,并将其添加到解码消息的 List 中
        out.add(in.readInt());
    }
}

ReplayingDecoder 虽然简化了操作,但是性能稍低于 ByteToMessageDecoder 。所以,如果解码器复杂,则使用 ReplayingDecoder 以简化操作,否则使用 ByteToMessageDecoder 以提高性能。

更多的解码器:

io.netty.handler.codec 子包下面,有更多用于特定用例的编码器和解码器实现。

抽象类 MessageToMessageDecoder

MessageToMessageDecoder 用于两个消息格式之间进行转换(例如,从一种 POJO 类型转换为另一种):

public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter

类型 I 指定了 decode() 方法的输入参数 msg 的类型,它是你必须实现 的唯一方法。

编写一个 IntegerToStringDecoder 解码器来扩展 MessageToMessageDecoder<Integer> 。它的 decode() 方法会把 Integer 参数转换为它的 String 表示,并有了下面的签名:

public void decode( ChannelHandlerContext ctx, Integer msg, List<Object> out ) throws Exception

IntegerToStringDecoder 类:

public class IntegerToStringDecoder extends MessageToMessageDecoder<Integer> {
    @Override
    public void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
        out.add(String.valueOf(msg));
    }
}

有关更加复杂的例子,请研究 io.netty.handler.codec.http.HttpObjectAggregator 类,它扩展了 MessageToMessageDecoder<HttpObject>

TooLongFrameException 类

由于 Netty 是一个异步框架,所以需要在字节可以解码之前在内存中缓冲它们。因此,不能让解码器缓冲大量的数据以至于耗尽可用的内存。为了解除这个常见的顾虑,Netty 提供了 TooLongFrameException 类,其将由解码器在帧超出指定的大小限制时抛出。

如果你正在使用一个可变帧大小的协议,那么这种波阿虎措施将是尤为重要的,下面是一个简单的例子:

public class SafeByteToMessageDecoder extends ByteToMessageDecoder {

    private static final int MAX_FRAME_SIZE = 1024;

    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int readable = in.readableBytes();
        // 检查缓冲区是否有超过 MAX_FRAME_SIZE 个字节
        if (readable > MAX_FRAME_SIZE) {
            // 跳过所有的可读字节,抛出 TooLongFrameException 并通知 ChannelHandler
            in.skipBytes(readable);
            throw new TooLongFrameException("Frame too big!");
        }

        //...
    }
}

编码器

编码器实现了 ChannelOutboundHandler ,并将出站数据从一种格式转换为另一种格式,和我们方才学习的解码器的功能正好相反。Netty 提供了一组类,用于帮助你编写具有以下功能的编码器:

抽象类 MessageToByteEncoder

MessageToByteEncoder 和 ByteToMessageDecoder 做的事情刚好相反,并且只有一个 encode() 方法:

下面例子接受一个 Short 类型的实例作为消息,将它编码为 Short 的原子类型值,并将它写入 ByteBuf 中,其将随后被转发给 ChannelPipeline 中的下一个 ChannelOutboundHandler :

public class ShortToByteEncoder extends MessageToByteEncoder<Short> {
    @Override
    public void encode(ChannelHandlerContext ctx, Short msg, ByteBuf out) throws Exception {
        // 将 Short 写入 ByteBuf 中
        out.writeShort(msg);
    }
}

Netty 提供了一些专门化的 MessageToByteEncoder ,你可以基于它们实现自己的编码器。 WebSocket08FrameEncoder 类提供了一个很好的实例。你可以在 io.netty.handler.codec.http.websocketx 包中找到它。

抽象类 MessageToMessageEncoder

MessageToMessageEncoder 和 MessageToMessageDecoder 只是数据方向相反,MessageToMessageEncoder 使用 encode() 方法将出站数据从一种消息转换为另一种消息:

下面示例,编码器将每个出站 Integer 的 String 表示添加到该 List 中:

public class IntegerToStringEncoder extends MessageToMessageEncoder<Integer> {
    @Override
    public void encode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
        // 将 Integer 转换为 String ,并将其添加到 List 中
        out.add(String.valueOf(msg));
    }
}

关于更多 MessageToMessageEncoder 的专业用法,请查看 io.netty.handler.codec.protobuf.ProtobufEncoder 类,它处理了由 Google 的 Protocol Buffers 规范所定义的数据格式。

抽象的编解码器类

虽然我们一直将解码器和编码器分开讨论,但是你有时会发现在同一个类中管理入站和出站数据和消息的转换是很有用的。Netty 的抽象编解码器类正好用于这个目的,因为它们每个都将捆绑一个解码器/编码器对,以处理这两种类型。这些抽象的编解码器类同时实现了 ChannelInboundHandler 和 ChannelOutboundHandler 接口,但是为了最大化代码的可重用和可扩展性,我们一般还是会分开使用编码器和解码器。

抽象类 ByteToMessageCodec

假如我们需要将字节解码为某种形式,可能是 POJO ,随后再次对它进行编码。ByteToMessageCodec 将为我们处理好这一切,因为它结合了 ByteToMessageDecoder 以及它的逆向 MessageToByteEncoder 。

任何的请求/响应协议都可以作为使用 ByteToMessageCodec 的理想选择

ByteToMessageCodec API :

抽象类 MessageToMessageCodec

MessageToMessageCodec 是一个参数化的类,定义如下:

public abstract class MessageToMessageCodec<INBOUND_IN,OUTBOUND_IN>

MessageToMessageCodec API :

这个编解码器主要用于在两种不同的消息 API 之间来回转换数据。

下面的示例是很常见的 MessageToMessageCodec 用例:

public class WebSocketConvertHandler extends MessageToMessageCodec<WebSocketFrame, WebSocketConvertHandler.MyWebSocketFrame> {

    @Override
    protected void encode(ChannelHandlerContext ctx, WebSocketConvertHandler.MyWebSocketFrame msg, List<Object> out) throws Exception {
        ByteBuf payload = msg.getData().duplicate().retain();
        switch (msg.getType()) {
            case BINARY:
                out.add(new BinaryWebSocketFrame(payload));
                break;
            case TEXT:
                out.add(new TextWebSocketFrame(payload));
                break;
            case CLOSE:
                out.add(new CloseWebSocketFrame(true, 0, payload));
                break;
            case CONTINUATION:
                out.add(new ContinuationWebSocketFrame(payload));
                break;
            case PONG:
                out.add(new PongWebSocketFrame(payload));
                break;
            case PING:
                out.add(new PingWebSocketFrame(payload));
                break;
            default:
                throw new IllegalStateException("Unsupported websocket msg " + msg);
        }
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception {
        ByteBuf payload = msg.content().duplicate().retain();
        if (msg instanceof BinaryWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.BINARY, payload));
        } else if (msg instanceof CloseWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CLOSE, payload));
        } else if (msg instanceof PingWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PING, payload));
        } else if (msg instanceof PongWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PONG, payload));
        } else if (msg instanceof TextWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.TEXT, payload));
        } else if (msg instanceof ContinuationWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CONTINUATION, payload));
        } else {
            throw new IllegalStateException("Unsupported websocket msg " + msg);
        }
    }

    // 声明 WebSocketConvertHandler 所使用的 OUTBOUND_IN 类型
    public static final class MyWebSocketFrame {

        // 定义拥有被包装的有效负载的 WebSocketFrame 的类型
        public enum FrameType {
            BINARY,
            CLOSE,
            PING,
            PONG,
            TEXT,
            CONTINUATION
        }

        private final FrameType type;
        private final ByteBuf data;

        public MyWebSocketFrame(FrameType type, ByteBuf data) {
            this.type = type;
            this.data = data;
        }

        public FrameType getType() {
            return type;
        }

        public ByteBuf getData() {
            return data;
        }
    }
}

CombinedChannelDuplexHandler 类

CombinedChannelDuplexHandler 类充当了 ChannelInboundHandler 和 ChannelOutboundHandler 的容器。通过提供分别继承了解码器类和编码器类的类型,我们可以实现一个编解码器,而又不必直接扩展抽象的编解码器类。

字节流解码为 Char 类型:

public class ByteToCharDecoder extends ByteToMessageDecoder {
    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        while (in.readableBytes() >= 2) {
            out.add(in.readChar());
        }
    }
}

Char 类型编码为字节流:

public class CharToByteEncoder extends MessageToByteEncoder<Character> {
    @Override
    public void encode(ChannelHandlerContext ctx, Character msg, ByteBuf out) throws Exception {
        out.writeChar(msg);
    }
}

将上面解码器和编码器结合起来,构建一个编解码器:

public class CombinedByteCharCodec extends CombinedChannelDuplexHandler<ByteToCharDecoder, CharToByteEncoder> {
    public CombinedByteCharCodec() {
        super(new ByteToCharDecoder(), new CharToByteEncoder());
    }
}

通过这种方式结合实现相对于十月编解码器类的方式来说可能更加简单也更加的灵活。