ChannelHandler和ChannelPipeline-Netty笔记(六)

ChannelHandler 家族

Channel 的生命周期

Channel 接口定义了一组和 ChannelInboundHandler API密切相关的简单但功能强大的状态模型。Channel 的 4 个状态:

  • ChannelUnregistered:Channel已经被创建,但还未注册到 EventLoop。
  • ChannelRegistered:Channel已经被注册到了 EventLoop。
  • ChannelActive:Channel 处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了。
  • ChannelInactive:Channel 没有连接到远程节点。
ChannelHandler和ChannelPipeline-Netty笔记(六)
Channel 的状态模型

当这些状态发生改变时,将会生成对应的事件。这些事件将会被转发给 ChannelPipeline 中的 ChannelHandler ,其可以随后对它们做出相应。

ChannelHandler 的生命周期

在 ChannelHandler 被添加到 ChannelPipeline 中或者被从 ChannelPipeline 中移除时会调用下面这些方法:

  • handlerAdded:当把 ChannelHandler 添加到 ChannelPipeline 中时被调用
  • handlerRemoved:当从 ChannelPipeline 中移除 ChannelHandler 时被调用
  • exceptionCaught:当处理过程中在 ChannelPipeline 中有错误产生时被调用

Netty 定义了下面两个重要的 ChannelHandler 子接口:

  • ChannelInboundHandler:处理入站数据以及各种状态变化。
  • ChannelOutboundHandler:处理出站数据并允许拦截。

ChannelInboundHandler 接口

ChannelInboundHandler 的生命周期方法:

  • channelRegistered:当 Channel 已经注册到它的 EventLoop 并且能够处理 I/O 时被调用
  • channelUnregistered:当 Channel 从它的 EventLoop 注销并且无法处理任何 I/O 时被调用
  • channelActive:当 Channel 处于活动状态时被调用,Channel 已经连接/绑定并且已经就绪
  • channelInactive:当 Channel 离开活动状态并且不再连接它的远程节点时被调用
  • channelReadComplete:当 Channel 上的一个读操作完成时被调用。当所有可读的字节都已经从 Channel 中读取之后,将会调用该回调方法。所以,可能在 channelReadComplete() 被调用之前看到多次调用 channelRead() 方法
  • channelRead:当从 Channel 读取数据时被调用
  • ChannelWritabilityChanged:当 Channel 的可写状态发生改变时被调用。用户可以确保写操作不会完成得太快(以避免发生 OutOfMemoryError)或者可以在 Channel 变为再次可写时恢复写入。可以通过调用Channel的isWritable()方法来检测 Channel 的可写性。与可写性相关的阈值可以通过 Channel.config().setWriteHighWaterMark() 和 Channel.config().setWriteLowWaterMark() 方法来设置
  • userEventTriggered:当 ChannelnboundHandler.fireUserEventTriggered() 方法被调用时被调用,因为一个 POJO 被传经了 ChannelPipeline

当某个 ChannelInboundHandler 的实现重写 channelRead() 方法时,它将负责显式地释放与池化的 ByteBuf 实例相关的内存。释放消息资源:

Netty 将使用 WARN 级别的日志消息记录未释放的资源,使得可以非常简单地在代码中发现违规的实例。但是以这种方式管理资源可能很频繁,所以 Netty 提供了一个更加简单的方式,使用 SimpleChannelInboundHandler 会自动释放资源,所以你不应该存储指向任何消息的引用供将来使用,因为这些引用都将会失效。

ChannelOutboundHandler 接口

出站操作和数据将由 ChannelOutboundHandler 处理,它的方法将被 Channel 、 ChannelPipeline 以及 ChannelHandlerContext 调用。

ChannelOutboundHandler 的一个强大的功能是可以按需推迟操作或者事件,这使得可以通过一些复杂的方法来处理请求。例如,如果到远程节点的写入被暂停了,那么你可以推迟冲刷操作并在稍后继续。

  • bind(ChannelHandlerContext, SocketAddress, ChannelPromise):当请求将 Channel 绑定到本地地址时被调用
  • connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise):当请求将 Channel 连接到远程节点时被调用
  • disconnect(ChannelHandlerContext, ChannelPromise):当请求将 Channel 从远程节点断开时被调用
  • close(ChannelHandlerContext, ChannelPromise):当请求关闭 Channel 时被调用
  • deregister(ChannelHandlerContext, ChannelPromise):当请求将 Channel 从它的 EventLoop 注销时被调用
  • read(ChannelHandlerContext):当请求从 Channel 读取更多的数据时被调用
  • flush(ChannelHandlerContext):当请求通过 Channel 将入队数据冲刷到远程节点时被调用
  • write(ChannelHandlerContext, Object, ChannelPromise):当请求通过 Channel 将数据写到远程节点时被调用

ChannelOutboundHandler 中大部分方法都需要一个 ChannelPromise 参数,以便在操作完成时得到通知。ChannelPromise 是 ChannelFuture 的一个子类,其定义了一些可写的方法,如 setSuccess()setFailure() ,从而使 ChannelFuture 不可变。借鉴了 Scala 的 Promise 和 Future 的设计,当一个 Promise 被完成之后,其对应的 Future 的值便不能再进行任何修改了。

ChannelHandler 适配器

可以使用 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 类作为自己的 ChannelHandler 的起始点。这两个适配器分别提供了 ChannelInboundHandler 和 ChannelOutboundHandler 的基本实现。通过扩展抽象类 ChannelHandlerAdapter ,它们获得了它们共同的超接口 ChannelHandler 的方法。

ChannelHandler和ChannelPipeline-Netty笔记(六)
ChannelHandlerAdapter 类的层次结构

ChannelHandlerAdapter 还提供了实用方法 isSharable() 。如果其对应的实现被标注 @Sharable 注解,那么这个方法将返回 true ,表示它可以被添加到多个 ChannelPipeline 中。

在 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 中所提供的方法体调用了其相关联的 ChannelHandlerContext 上的等效方法,从而将事件转发到了 ChannelPipeline 中的下一个 ChannelHandler 中。

资源管理

每当通过调用 ChannelInboundHandler.channelRead() 或者 ChannelOutboundHandler.write() 方法来处理数据时,你都需要确保没有任何的资源泄露。Netty 提供了 ResourceLeakDetector 类来帮助诊断潜在的资源泄露问题,它将对你的应用程序的缓冲区分配大约 1% 的采样来检测内存泄露。如果检测到内存泄露,将会产生类似下面的日志消息:

Netty 目前定义了 4 种泄露检测级别:

  • DISABLED:禁用泄漏检测。只有在详尽的测试之后才应设置为这个值
  • SIMPLE:使用 1% 的默认采样率检测并报告任何发现的泄露。这是默认级别,适合绝大部分的情况
  • ADVANCED:使用默认的采样率,报告所发现的任何的泄露以及对应的消息被访问的位置
  • PARANOID:类似于ADVANCED,但是其将会对每次(对消息的)访问都进行采样。这对性能将会有很大的影响,应该只在调试阶段使用

检测泄露级别可以通过 Java 系统属性设置为上面 4 种级别的一个值来定义:

如果带着该 JVM 选项重启你的应用程序,你将看到自己的应用程序最近被泄露的缓冲区被访问的位置。一个典型的由单元测试产生的泄露报告:

消费并显式释放入站消息:

由于消费入站数据是一项常规的任务,所以 Netty 提供了一个特殊的被称为 SimpleChannelInboundHandler 的 ChannelInboundHandler 实现。这个实现会在消息被 channelRead0() 方法消费之后自动释放消息。

丢弃并显式释放出站消息:

不仅要释放资源,还需要通知 ChannelPromise ,否则可能出现 ChannelFutureListener 收不到某个消息已经被处理了的通知情况。

总之,如果一个消息被消费或者丢弃了,并且没有传递给 ChannelPipeline 中的下一个 ChannelOutboundHandler ,那么用户就有责任调用 ReferenceCountUtil.release() 。如果消息到达了实际的传输层,那么当它被下入时或者 Channel 关闭时,都将被自动释放。

ChannelPipeline 接口

每创建一个新的 Channel 都会被分配一个新的 ChannelPipeline ,这项关联是永久性的。Channel 既不能附加另外一个 ChannelPipeline ,也不能分离其当前的。在 Netty 组件的生命周期中,这是一项固定的操作,不需要开发人员的任何干预。

根据事件的起源,事件将会被 ChannelInboundHandler 或者 ChannelOutboundHandler 处理。随后,通过调用 ChannelHandlerContext 实现,它将被转发给同一个超类型的下一个 ChannelHandler 。

ChannelHandlerContext 使得 ChannelHandler 能够和它的 ChannelPipeline 以及其他的 ChannelHandler 交互。 ChannelHandler 可以通知其所属的 ChannelPipeline 中的下一个 ChannelHandler ,甚至可以动态修改它所属的 ChannelPipeline 。

典型的同时具有入站和出站 ChannelHandler 的 ChannelPipeline 的布局:

ChannelHandler和ChannelPipeline-Netty笔记(六)
ChannelPipeline 和它的 ChannelHandler

修改 ChannelPipeline

ChannelHandler 可以通过添加、删除或者替换其他的 ChannelHandler 来实时地修改 ChannelPipeline 的布局。它也可以把它自己从 ChannelPipeline 中移除。

  • addFirst/addBefore/addAfter/addLst:将一个 ChannelHandler 添加到 ChannelPipeline 中。
  • remove:将一个 ChannelHandler 从 ChannelPipeline 中移除。
  • replace:将 ChannelPipeline 中的一个 ChannelHandler 替换为另一个 ChannelHandler 。
  • get:通过类型或者名称返回 ChannelHandler 。
  • context:返回和 ChannelHandler 绑定的 ChannelHandlerContext 。
  • names:返回 ChannelPipeline 中所有 ChannelHandler 的名称。

修改 ChannelPipeline :

触发事件

ChannelPipeline 的 API 公开了用于调用入站和出站操作的附加方法。

ChannelPipeline 的入站操作:

  • fireChannelRegistered:调用 ChannelPipeline 中下一个 ChannelInboundHandler 的 channelRegistered(ChannelHandlerContext)方法
  • fireChannelUnregistered:调用 ChannelPipeline 中下一个 ChannelInboundHandler 的 channelUnregistered(ChannelHandlerContext)方法
  • fireChannelActive:调用 ChannelPipeline 中下一个 ChannelInboundHandler 的 channelActive(ChannelHandlerContext)方法
  • fireChannelInactive:调用 ChannelPipeline 中下一个 ChannelInboundHandler 的 channelInactive(ChannelHandlerContext)方法
  • fireExceptionCaught:调用 ChannelPipeline 中下一个 ChannelInboundHandler 的 exceptionCaught(ChannelHandlerContext, Throwable)方法
  • fireUserEventTriggered:调用 ChannelPipeline 中下一个 ChannelInboundHandler 的 userEventTriggered(ChannelHandlerContext, Object)方法
  • fireChannelRead:调用 ChannelPipeline 中下一个 ChannelInboundHandler 的 channelRead(ChannelHandlerContext, Object msg)方法
  • fireChannelReadComplete:调用 ChannelPipeline 中下一个 ChannelInboundHandler 的 channelReadComplete(ChannelHandlerContext)方法
  • fireChannelWritabilityChanged:调用 ChannelPipeline 中下一个 ChannelInboundHandler 的 channelWritabilityChanged(ChannelHandlerContext)方法

ChannelPipeline 的出站操作:

  • bind:将 Channel 绑定到一个本地地址,这将调用 ChannelPipeline 中的下一个 ChannelOutboundHandler 的 bind(ChannelHandlerContext, SocketAddress, ChannelPromise) 方法
  • connect:将 Channel 连接到一个远程地址,这将调用 ChannelPipeline 中的下一个 ChannelOutboundHandler 的 connect(ChannelHandlerContext, SocketAddress, ChannelPromise) 方法
  • disconnect:将 Channel 断开连接。这将调用 ChannelPipeline 中的下一个 ChannelOutboundHandler 的 disconnect(ChannelHandlerContext, Channel Promise) 方法
  • close:将 Channel 关闭。这将调用 ChannelPipeline 中的下一个 ChannelOutboundHandler 的 close(ChannelHandlerContext, ChannelPromise) 方法
  • deregister:将 Channel 从它先前所分配的 EventExecutor(即 EventLoop) 中注销。这将调用 ChannelPipeline 中的下一个 ChannelOutboundHandler 的 deregister(ChannelHandlerContext, ChannelPromise) 方法
  • flush:冲刷 Channel 所有挂起的写入。这将调用 ChannelPipeline 中的下一个 ChannelOutboundHandler 的 flush(ChannelHandlerContext) 方法
  • write:将消息写入 Channel。这将调用 ChannelPipeline 中的下一个 ChannelOutboundHandler 的 write(ChannelHandlerContext, Object msg, ChannelPromise)方法。注意:这并不会将消息写入底层的 Socket,而只会将它放入队列中。要将它写入 Socket,需要调用 flush() 或者 writeAndFlush() 方法
  • writeAndFlush:这是一个先调用 write() 方法再接着调用 flush() 方法的便利方法
  • read:请求从 Channel 中读取更多的数据。这将调用 ChannelPipeline 中的下一个 ChannelOutboundHandler 的 read(ChannelHandlerContext) 方法

总结:

  • ChannelPipeline 保存了与 Channel 相关联的 ChannelHandler 。
  • ChannelPipeline 可以根据需要,通过添加或者删除 ChannelHandler 来动态地修改。
  • ChannelPipeline 有着丰富的 API 用以被调用,以响应入站和出站事件。

ChannelHandlerContext 接口

ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之间的关联,每当有 ChannelHandler 添加到 ChannelPipeline 中时,都会创建 ChannelHandlerContext 。ChannelHandlerContext 的主要功能是管理它所关联的 ChannelHandler 和在同一个 ChannelPipeline 中的其他 ChannelHandler 之间的交互

ChannelHandlerContext 有很多的方法,其中一些方法也存在于 Channel 和 ChannelPipeline 本身上,但是有一点重要的不同。如果调用 Channel 或者 ChannelPipeline 上的这些方法,它们将沿着整个 ChannelPipeline 进行传播。而调用位于 ChannelHandlerContext 上的相同方法,则将从当前所关联的 ChannelHandler 开始,并且只会传播给位于该 ChannelPipeline 中的下一个能够处理该事件的 ChannelHandler。

ChannelHandlerContext API:

  • alloc:返回和这个实例相关联的 Channel 所配置的 ByteBufAllocator
  • bind:绑定到给定的 SocketAddress,并返回 ChannelFuture
  • channel:返回绑定到这个实例的 Channel
  • close:关闭 Channel,并返回 ChannelFuture
  • connect:连接给定的 SocketAddress,并返回 ChannelFuture
  • deregister:从之前分配的 EventExecutor 注销,并返回 ChannelFuture
  • disconnect:从远程节点断开,并返回 ChannelFuture
  • executor:返回调度事件的 EventExecutor
  • fireChannelActive:触发对下一个 ChannelInboundHandler 上的 channelActive()方法(已连接)的调用
  • fireChannelInactive:触发对下一个 ChannelInboundHandler 上的 channelInactive()方法(已关闭)的调用
  • fireChannelRead:触发对下一个 ChannelInboundHandler 上的 channelRead()方法(已接收的消息)的调用
  • fireChannelReadComplete:触发对下一个 ChannelInboundHandler 上的 channelReadComplete()方法的调用
  • fireChannelRegistered:触发对下一个 ChannelInboundHandler 上的 fireChannelRegistered()方法的调用
  • fireChannelUnregistered:触发对下一个 ChannelInboundHandler 上的 fireChannelUnregistered()方法的调用
  • fireChannelWritabilityChanged:触发对下一个ChannelInboundHandler上的 fireChannelWritabilityChanged()方法的调用
  • fireExceptionCaught:触发对下一个 ChannelInboundHandler 上的 fireExceptionCaught(Throwable)方法的调用
  • fireUserEventTriggered:触发对下一个 ChannelInboundHandler 上的 fireUserEventTriggered(Object evt)方法的调用
  • handler:返回绑定到这个实例的 ChannelHandler
  • isRemoved:如果所关联的 ChannelHandler 已经被从 ChannelPipeline 中移除则返回 true
  • name:返回这个实例的唯一名称
  • pipeline:返回这个实例所关联的 ChannelPipeline
  • read:将数据从Channel读取到第一个入站缓冲区;如果读取成功则触 发一个channelRead事件,并(在最后一个消息被读取完成后)通知 ChannelInboundHandler 的 channelReadComplete (ChannelHandlerContext)方法
  • write:通过这个实例写入消息并经过 ChannelPipeline
  • writeAndFlush:通过这个实例写入并冲刷消息并经过 ChannelPipeline

使用 ChannelHandlerContext 的 API 时,要记住两点:

ChannelHandlerContext 和 ChannelHandler 之间的关联是永远不会改变的,所以缓存对它是安全的。

相对于其他类的同名方法,ChannelHandlerContext 的方法将产生更短的事件流,应该尽可能地利用这个特性来获得最大的性能。

使用 ChannelHandlerContext

下图说明了 ChannelPipeline 、 ChannelHandlerContext 、 ChannelHandler 和 Channel 的关系:

ChannelHandler和ChannelPipeline-Netty笔记(六)

通过 ChannelHandlerContext 获取到 Channel 的引用,调用 Channel 上的 write() 方法将会导致写入事件从尾端到头部地流经 ChannelPipeline :

通过 ChannelHandlerContext 获取到 ChannelPipeline 的引用,并调用 ChannelPipeline 上的 write() 方法写入,也会导致写入事件从尾端到头部地流经 ChannelPipeline :

虽然被调用的 Channel 或 ChannelPipeline 上的 write() 方法将一直传播事件通过整个 ChannelPipeline ,但是在 ChannelHandler 的级别上,事件从一个 ChannelHandler 到下一个 ChannelHandler 的移动是由 ChannelHandlerContext 完成的。传递流程如下:

ChannelHandler和ChannelPipeline-Netty笔记(六)

代码中如何传递:

调用特定的 ChannelHandler 实例关联的 ChannelHandlerContext 的方法,绕过了前面所有的 ChannelHandler :

ChannelHandler和ChannelPipeline-Netty笔记(六)

ChannelHandler 和 ChannelHandlerContext 高级用法

ChannelHandlerContext 可以缓存引用以供稍后使用:

因为一个 ChannelHandler 可以从属于多个 ChannelPipeline ,所以 ChannelHandler 可以绑定到多个 ChannelHandlerContext 实例,但需要使用 @Sharable 注解标注 ChannelHandler :

下面这段代码将有线程安全的问题:

总之,只应该在确定了 ChannelHandler 是线程安全时才使用 @Sharable 注解。

异常处理

处理入站异常

处理入站事件的过程中发生的异常,需要重写 ChannelInboundHandler 中的 exceptionCaught() 方法。需要注意的是,异常和入站事件一样,发生异常后也会继续按照入站的方向流动,所以通常应该在 ChannelPipeline 的最后处理入站异常,这确保了所有的入站异常都会被处理到,无论异常发生在 ChannelPipeline 中的什么位置。

处理出站异常

每一个出站操作都将返回一个 ChannelFuture ,注册到 ChannelFuture 的 ChannelFutureListener 将在操作完成时被通知,我们可以在通知方法中判断该操作成功还是出错了。

几乎所有的 ChannelOutboundHandler 上的方法都会传入一个 ChannelPromise(ChannelFuture 的子类)的实例,这个实例提供了一些立即通知的可写方法:

细致的异常处理:通过调用出站操作(如 write() 方法)返回的 ChannelFuture 的 addListener() 方法来添加监听器:

一般的异常处理:将 ChannelFutureListener 添加到即将作为参数传递给 ChannelOutboundHandler 的方法 ChannelPromise :

    A+
发布日期:2019年04月19日  所属分类:Java
标签:
六阿哥

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: