ChannelHandler和ChannelPipeline-Netty笔记(六)

/ 0评 / 0

ChannelHandler 家族

Channel 的生命周期

Channel 接口定义了一组和 ChannelInboundHandler API密切相关的简单但功能强大的状态模型。Channel 的 4 个状态:

Channel 的状态模型

当这些状态发生改变时,将会生成对应的事件。这些事件将会被转发给 ChannelPipeline 中的 ChannelHandler ,其可以随后对它们做出相应。

ChannelHandler 的生命周期

在 ChannelHandler 被添加到 ChannelPipeline 中或者被从 ChannelPipeline 中移除时会调用下面这些方法:

Netty 定义了下面两个重要的 ChannelHandler 子接口:

ChannelInboundHandler 接口

ChannelInboundHandler 的生命周期方法:

当某个 ChannelInboundHandler 的实现重写 channelRead() 方法时,它将负责显式地释放与池化的 ByteBuf 实例相关的内存。释放消息资源:

@Sharable
public class DiscardHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 丢弃已接收的消息
        ReferenceCountUtil.release(msg); 
    }
}

Netty 将使用 WARN 级别的日志消息记录未释放的资源,使得可以非常简单地在代码中发现违规的实例。但是以这种方式管理资源可能很频繁,所以 Netty 提供了一个更加简单的方式,使用 SimpleChannelInboundHandler 会自动释放资源,所以你不应该存储指向任何消息的引用供将来使用,因为这些引用都将会失效。

public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> { 
    @Override
    public void channelRead0(ChannelHandlerContext ctx, Object msg) {
        // 不需要任何显式的资源释放
    } 
}

ChannelOutboundHandler 接口

出站操作和数据将由 ChannelOutboundHandler 处理,它的方法将被 Channel 、 ChannelPipeline 以及 ChannelHandlerContext 调用。

ChannelOutboundHandler 的一个强大的功能是可以按需推迟操作或者事件,这使得可以通过一些复杂的方法来处理请求。例如,如果到远程节点的写入被暂停了,那么你可以推迟冲刷操作并在稍后继续。

ChannelOutboundHandler 中大部分方法都需要一个 ChannelPromise 参数,以便在操作完成时得到通知。ChannelPromise 是 ChannelFuture 的一个子类,其定义了一些可写的方法,如 setSuccess()setFailure() ,从而使 ChannelFuture 不可变。借鉴了 Scala 的 Promise 和 Future 的设计,当一个 Promise 被完成之后,其对应的 Future 的值便不能再进行任何修改了。

ChannelHandler 适配器

可以使用 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 类作为自己的 ChannelHandler 的起始点。这两个适配器分别提供了 ChannelInboundHandler 和 ChannelOutboundHandler 的基本实现。通过扩展抽象类 ChannelHandlerAdapter ,它们获得了它们共同的超接口 ChannelHandler 的方法。

ChannelHandlerAdapter 类的层次结构

ChannelHandlerAdapter 还提供了实用方法 isSharable() 。如果其对应的实现被标注 @Sharable 注解,那么这个方法将返回 true ,表示它可以被添加到多个 ChannelPipeline 中。

在 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 中所提供的方法体调用了其相关联的 ChannelHandlerContext 上的等效方法,从而将事件转发到了 ChannelPipeline 中的下一个 ChannelHandler 中。

资源管理

每当通过调用 ChannelInboundHandler.channelRead() 或者 ChannelOutboundHandler.write() 方法来处理数据时,你都需要确保没有任何的资源泄露。Netty 提供了 ResourceLeakDetector 类来帮助诊断潜在的资源泄露问题,它将对你的应用程序的缓冲区分配大约 1% 的采样来检测内存泄露。如果检测到内存泄露,将会产生类似下面的日志消息:

LEAK: ByteBuf.release() was not called before it's garbage-collected. 
Enable advanced leak reporting to find out where the leak occurred. 
To enable advanced leak reporting, specify the JVM option '-Dio.netty.leakDetectionLevel=ADVANCED' or call ResourceLeakDetector.setLevel().

Netty 目前定义了 4 种泄露检测级别:

检测泄露级别可以通过 Java 系统属性设置为上面 4 种级别的一个值来定义:

java -Dio.netty.leakDetectionLevel=ADVANCED

如果带着该 JVM 选项重启你的应用程序,你将看到自己的应用程序最近被泄露的缓冲区被访问的位置。一个典型的由单元测试产生的泄露报告:

Running io.netty.handler.codec.xml.XmlFrameDecoderTest
15:03:36.886 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected.
Recent access records: 1
#1: io.netty.buffer.AdvancedLeakAwareByteBuf.toString(
AdvancedLeakAwareByteBuf.java:697) io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithXml(
XmlFrameDecoderTest.java:157) io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithTwoMessages( 
... 

消费并显式释放入站消息:

@Sharable
public class DiscardInboundHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 释放资源
        ReferenceCountUtil.release(msg); 
    }
}

由于消费入站数据是一项常规的任务,所以 Netty 提供了一个特殊的被称为 SimpleChannelInboundHandler 的 ChannelInboundHandler 实现。这个实现会在消息被 channelRead0() 方法消费之后自动释放消息。

丢弃并显式释放出站消息:

@Sharable
public class DiscardOutboundHandler extends ChannelOutboundHandlerAdapter { 
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { 
        // 释放资源
        ReferenceCountUtil.release(msg);
        // 通知 ChannelPromise 数据已经被处理了
        promise.setSuccess();
    } 
}

不仅要释放资源,还需要通知 ChannelPromise ,否则可能出现 ChannelFutureListener 收不到某个消息已经被处理了的通知情况。

总之,如果一个消息被消费或者丢弃了,并且没有传递给 ChannelPipeline 中的下一个 ChannelOutboundHandler ,那么用户就有责任调用 ReferenceCountUtil.release() 。如果消息到达了实际的传输层,那么当它被下入时或者 Channel 关闭时,都将被自动释放。

ChannelPipeline 接口

每创建一个新的 Channel 都会被分配一个新的 ChannelPipeline ,这项关联是永久性的。Channel 既不能附加另外一个 ChannelPipeline ,也不能分离其当前的。在 Netty 组件的生命周期中,这是一项固定的操作,不需要开发人员的任何干预。

根据事件的起源,事件将会被 ChannelInboundHandler 或者 ChannelOutboundHandler 处理。随后,通过调用 ChannelHandlerContext 实现,它将被转发给同一个超类型的下一个 ChannelHandler 。

ChannelHandlerContext 使得 ChannelHandler 能够和它的 ChannelPipeline 以及其他的 ChannelHandler 交互。 ChannelHandler 可以通知其所属的 ChannelPipeline 中的下一个 ChannelHandler ,甚至可以动态修改它所属的 ChannelPipeline 。

典型的同时具有入站和出站 ChannelHandler 的 ChannelPipeline 的布局:

ChannelPipeline 和它的 ChannelHandler

修改 ChannelPipeline

ChannelHandler 可以通过添加、删除或者替换其他的 ChannelHandler 来实时地修改 ChannelPipeline 的布局。它也可以把它自己从 ChannelPipeline 中移除。

修改 ChannelPipeline :

ChannelPipeline pipeline = ..;
// 创建一个 FirstHandler 的实例
FirstHandler firstHandler = new FirstHandler(); 
// 将该实例作为 "handler1" 添加到 ChannelPipeline 中
pipeline.addLast("handler1", firstHandler); 
// 将一个 SecondHandler 的实例作为 "handler2" 添加到 ChannelPipeline 的第一个槽中。这意味着它将被放置在已有的 "handler1" 之前
pipeline.addFirst("handler2", new SecondHandler()); 
// 将一个 ThirdHandler 的实例作为 "handler3" 添加到 ChannelPipeline 的最后一个槽中
pipeline.addLast("handler3", new ThirdHandler());
...
// 通过名称移除 "handler3"
pipeline.remove("handler3"); 
// 通过引用移除 FirstHandler
pipeline.remove(firstHandler); 
// 将 SecondHandler("handler2") 替换为 ForthHandler("handler4")
pipeline.replace("handler2", "handler4", new ForthHandler());

触发事件

ChannelPipeline 的 API 公开了用于调用入站和出站操作的附加方法。

ChannelPipeline 的入站操作:

ChannelPipeline 的出站操作:

总结:

ChannelHandlerContext 接口

ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之间的关联,每当有 ChannelHandler 添加到 ChannelPipeline 中时,都会创建 ChannelHandlerContext 。ChannelHandlerContext 的主要功能是管理它所关联的 ChannelHandler 和在同一个 ChannelPipeline 中的其他 ChannelHandler 之间的交互

ChannelHandlerContext 有很多的方法,其中一些方法也存在于 Channel 和 ChannelPipeline 本身上,但是有一点重要的不同。如果调用 Channel 或者 ChannelPipeline 上的这些方法,它们将沿着整个 ChannelPipeline 进行传播。而调用位于 ChannelHandlerContext 上的相同方法,则将从当前所关联的 ChannelHandler 开始,并且只会传播给位于该 ChannelPipeline 中的下一个能够处理该事件的 ChannelHandler。

ChannelHandlerContext API:

使用 ChannelHandlerContext 的 API 时,要记住两点:

ChannelHandlerContext 和 ChannelHandler 之间的关联是永远不会改变的,所以缓存对它是安全的。

相对于其他类的同名方法,ChannelHandlerContext 的方法将产生更短的事件流,应该尽可能地利用这个特性来获得最大的性能。

使用 ChannelHandlerContext

下图说明了 ChannelPipeline 、 ChannelHandlerContext 、 ChannelHandler 和 Channel 的关系:

通过 ChannelHandlerContext 获取到 Channel 的引用,调用 Channel 上的 write() 方法将会导致写入事件从尾端到头部地流经 ChannelPipeline :

ChannelHandlerContext ctx = ..;
// 获取到与 ChannelHandlerContext 相关联的 Channel 的引用
Channel channel = ctx.channel();
// 通过 Channel 写入缓冲区
channel.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));

通过 ChannelHandlerContext 获取到 ChannelPipeline 的引用,并调用 ChannelPipeline 上的 write() 方法写入,也会导致写入事件从尾端到头部地流经 ChannelPipeline :

ChannelHandlerContext ctx = ..;
// 获取到与 ChannelHandlerContext 相关联的 ChannelPipeline 的引用
ChannelPipeline pipeline = ctx.pipeline(); 
// 通过 ChannelPipeline 写入缓冲区
pipeline.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));

虽然被调用的 Channel 或 ChannelPipeline 上的 write() 方法将一直传播事件通过整个 ChannelPipeline ,但是在 ChannelHandler 的级别上,事件从一个 ChannelHandler 到下一个 ChannelHandler 的移动是由 ChannelHandlerContext 完成的。传递流程如下:

代码中如何传递:

// 获取到 ChannelHandlerContext 的引用
ChannelHandlerContext ctx = ..;
// write() 方法将把缓冲区数据发送到下一个 ChannelHandler
ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));

调用特定的 ChannelHandler 实例关联的 ChannelHandlerContext 的方法,绕过了前面所有的 ChannelHandler :

ChannelHandler 和 ChannelHandlerContext 高级用法

ChannelHandlerContext 可以缓存引用以供稍后使用:

public class WriteHandler extends ChannelHandlerAdapter {

    // 存储 ChannelHandlerContext 的引用以供稍后使用
    private ChannelHandlerContext ctx;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        this.ctx = ctx;
    }

    // 使用之前存储的 ChannelHandlerContext 的引用来发送消息
    public void send(String msg) {
        ctx.writeAndFlush(msg);
    }
}

因为一个 ChannelHandler 可以从属于多个 ChannelPipeline ,所以 ChannelHandler 可以绑定到多个 ChannelHandlerContext 实例,但需要使用 @Sharable 注解标注 ChannelHandler :

// 使用注解 @Sharable 标注这个 ChannelHandler 可以被多个 ChannelPipeline 添加
@Sharable
public class SharableHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("Channel read message: " + msg);
        // 记录方法调用,并转发给下一个 ChannelHandler
        ctx.fireChannelRead(msg);
    }
}

下面这段代码将有线程安全的问题:

@Sharable
public class UnsharableHandler extends ChannelInboundHandlerAdapter {

    private int count;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 将 count 字段值加 1
        count++;
        System.out.println("channelRead(...) called the " + count + " time");
        // 记录方法调用,并转发给下一个 ChannelHandler
        ctx.fireChannelRead(msg);
    } 
}

总之,只应该在确定了 ChannelHandler 是线程安全时才使用 @Sharable 注解。

异常处理

处理入站异常

处理入站事件的过程中发生的异常,需要重写 ChannelInboundHandler 中的 exceptionCaught() 方法。需要注意的是,异常和入站事件一样,发生异常后也会继续按照入站的方向流动,所以通常应该在 ChannelPipeline 的最后处理入站异常,这确保了所有的入站异常都会被处理到,无论异常发生在 ChannelPipeline 中的什么位置。

public class InboundExceptionHandler extends ChannelInboundHandlerAdapter { 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    } 
}

处理出站异常

每一个出站操作都将返回一个 ChannelFuture ,注册到 ChannelFuture 的 ChannelFutureListener 将在操作完成时被通知,我们可以在通知方法中判断该操作成功还是出错了。

几乎所有的 ChannelOutboundHandler 上的方法都会传入一个 ChannelPromise(ChannelFuture 的子类)的实例,这个实例提供了一些立即通知的可写方法:

ChannelPromise setSuccess();
ChannelPromise setFailure(Throwable cause);

细致的异常处理:通过调用出站操作(如 write() 方法)返回的 ChannelFuture 的 addListener() 方法来添加监听器:

ChannelFuture future = channel.write(someMessage); future.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture f) {
    if (!f.isSuccess()) {
        f.cause().printStackTrace();
        f.channel().close();
    }
}); 

一般的异常处理:将 ChannelFutureListener 添加到即将作为参数传递给 ChannelOutboundHandler 的方法 ChannelPromise :

public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter { 
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        promise.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) {
                if (!f.isSuccess()) { f.cause().printStackTrace();
                    f.channel().close();
                }
            }); 
        } 
    }
}

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注